You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/23 09:59:03 UTC
[1/2] camel git commit: CAMEL-9740 : Improve camel-infinispan
Repository: camel
Updated Branches:
refs/heads/camel-2.17.x 2363157a8 -> 0de49d45a
refs/heads/master ab388bf2e -> aaaea389d
CAMEL-9740 : Improve camel-infinispan
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/aaaea389
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/aaaea389
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/aaaea389
Branch: refs/heads/master
Commit: aaaea389d158682fd19b2f09a7ca406bbbcb6e3f
Parents: ab388bf
Author: lburgazzoli <lb...@gmail.com>
Authored: Fri Mar 11 10:48:26 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 23 09:55:07 2016 +0100
----------------------------------------------------------------------
components/camel-infinispan/pom.xml | 2 +
.../infinispan/InfinispanConfiguration.java | 44 ++-
.../infinispan/InfinispanConstants.java | 2 +
.../infinispan/InfinispanConsumer.java | 19 +-
.../component/infinispan/InfinispanManager.java | 116 ++++++
.../infinispan/InfinispanOperation.java | 355 ++++++++++---------
.../infinispan/InfinispanProducer.java | 34 +-
.../component/infinispan/InfinispanUtil.java | 47 ++-
.../InfinispanAsyncLocalEventListener.java | 1 -
.../InfinispanConsumerEmbeddedHandler.java | 6 +-
.../InfinispanSyncClusteredEventListener.java | 9 +-
.../InfinispanSyncLocalEventListener.java | 7 +-
.../InfinispanIdempotentRepository.java | 17 +-
.../remote/InfinispanRemoteOperation.java | 6 +-
.../InfinispanClusterTestSupport.java | 6 +-
.../InfinispanConfigurationTestIT.java | 100 ++++++
.../test/resources/infinispan/client.properties | 20 ++
.../src/test/resources/log4j.xml | 70 ++++
18 files changed, 621 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/pom.xml b/components/camel-infinispan/pom.xml
index 7fcdb32..db95c94 100644
--- a/components/camel-infinispan/pom.xml
+++ b/components/camel-infinispan/pom.xml
@@ -254,6 +254,8 @@
<command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=static_filter_factory:add(configuration=default)</command>
<!-- Separate cache for @ClientListener(includeCurrentState=true) -->
<command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=include_current_state:add(configuration=default)</command>
+ <!-- Separate cache for protobuf serialized objects. -->
+ <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=misc_cache:add(configuration=default)</command>
</commands>
</executeCommands>
</configuration>
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
index 473a267..0559f84 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
import org.apache.camel.util.ObjectHelper;
import org.infinispan.commons.api.BasicCacheContainer;
+import org.infinispan.context.Flag;
@UriParams
public class InfinispanConfiguration {
@@ -55,6 +56,11 @@ public class InfinispanConfiguration {
private boolean clusteredListener;
@UriParam
private InfinispanQueryBuilder queryBuilder;
+ @UriParam(label = "advanced", javaType = "java.lang.String")
+ private Flag[] flags;
+ @UriParam(label = "advanced")
+ private String configurationUri;
+
public String getCommand() {
return command;
@@ -149,7 +155,7 @@ public class InfinispanConfiguration {
* TRANSACTION_REGISTERED, CACHE_ENTRY_INVALIDATED, DATA_REHASHED, TOPOLOGY_CHANGED, PARTITION_STATUS_CHANGED
*/
public void setEventTypes(String eventTypes) {
- this.eventTypes = new HashSet<String>(Arrays.asList(eventTypes.split(",")));
+ this.eventTypes = new HashSet<>(Arrays.asList(eventTypes.split(",")));
}
/**
@@ -181,4 +187,40 @@ public class InfinispanConfiguration {
public boolean hasQueryBuilder() {
return queryBuilder != null;
}
+
+ public Flag[] getFlags() {
+ return flags;
+ }
+
+ /**
+ * A comma separated list of Flag to be applied by default on each cache
+ * invocation, not applicable to remote caches.
+ */
+ public void setFlags(String flagsAsString) {
+ String[] flagsArray = flagsAsString.split(",");
+ this.flags = new Flag[flagsArray.length];
+
+ for (int i = 0; i < flagsArray.length; i++) {
+ this.flags[i] = Flag.valueOf(flagsArray[i]);
+ }
+ }
+
+ public void setFlags(Flag... flags) {
+ this.flags = flags;
+ }
+
+ public boolean hasFlags() {
+ return flags != null && flags.length > 0;
+ }
+
+ /**
+ * An implementation specific URI for the CacheManager
+ */
+ public String getConfigurationUri() {
+ return configurationUri;
+ }
+
+ public void setConfigurationUri(String configurationUri) {
+ this.configurationUri = configurationUri;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
index 1a27fd7..d791968 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.infinispan;
public interface InfinispanConstants {
+ int OPERATION_LEN = InfinispanConstants.OPERATION.length();
+
String EVENT_TYPE = "CamelInfinispanEventType";
String IS_PRE = "CamelInfinispanIsPre";
String CACHE_NAME = "CamelInfinispanCacheName";
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
index b4db422..e60156a 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
@@ -25,7 +25,6 @@ import org.apache.camel.impl.DefaultConsumer;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.Search;
import org.infinispan.commons.api.BasicCache;
-import org.infinispan.commons.api.BasicCacheContainer;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.dsl.Query;
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
public class InfinispanConsumer extends DefaultConsumer {
private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class);
private final InfinispanConfiguration configuration;
+ private final InfinispanManager manager;
private InfinispanEventListener listener;
private InfinispanConsumerHandler consumerHandler;
private BasicCache<Object, Object> cache;
@@ -43,6 +43,7 @@ public class InfinispanConsumer extends DefaultConsumer {
public InfinispanConsumer(InfinispanEndpoint endpoint, Processor processor, InfinispanConfiguration configuration) {
super(endpoint, processor);
this.configuration = configuration;
+ this.manager = new InfinispanManager(endpoint.getCamelContext(), configuration);
}
public void processEvent(String eventType, boolean isPre, String cacheName, Object key) {
@@ -69,10 +70,9 @@ public class InfinispanConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
+ manager.start();
- BasicCacheContainer cacheContainer = configuration.getCacheContainer();
- cache = InfinispanUtil.getCache(configuration.getCacheContainer(), configuration.getCacheName());
-
+ cache = manager.getCache();
if (configuration.hasQueryBuilder()) {
if (InfinispanUtil.isRemote(cache)) {
RemoteCache<Object, Object> remoteCache = InfinispanUtil.asRemote(cache);
@@ -81,15 +81,17 @@ public class InfinispanConsumer extends DefaultConsumer {
continuousQuery = Search.getContinuousQuery(remoteCache);
continuousQuery.addContinuousQueryListener(query, new ContinuousQueryEventListener(cache.getName()));
} else {
- throw new IllegalArgumentException("Can't run continuous queries against embedded cache (" + cache.getName() + ")");
+ throw new IllegalArgumentException(
+ "Can't run continuous queries against embedded cache (" + cache.getName() + ")");
}
} else {
- if (InfinispanUtil.isEmbedded(cacheContainer)) {
+ if (manager.isCacheContainerEmbedded()) {
consumerHandler = InfinispanConsumerEmbeddedHandler.INSTANCE;
- } else if (InfinispanUtil.isRemote(cacheContainer)) {
+ } else if (manager.isCacheContainerRemote()) {
consumerHandler = InfinispanConsumerRemoteHandler.INSTANCE;
} else {
- throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + cacheContainer);
+ throw new UnsupportedOperationException(
+ "Unsupported CacheContainer type " + manager.getCacheContainer().getClass().getName());
}
listener = consumerHandler.start(this);
@@ -106,6 +108,7 @@ public class InfinispanConsumer extends DefaultConsumer {
consumerHandler.stop(this);
}
+ manager.stop();
super.doStop();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
new file mode 100644
index 0000000..eab846b
--- /dev/null
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Service;
+import org.infinispan.cache.impl.DecoratedCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InfinispanManager implements Service {
+ private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanManager.class);
+
+ private final InfinispanConfiguration configuration;
+ private final CamelContext camelContext;
+ private BasicCacheContainer cacheContainer;
+ private boolean isManagedCacheContainer;
+
+
+ public InfinispanManager(InfinispanConfiguration configuration) {
+ this(null, configuration);
+ }
+
+ public InfinispanManager(CamelContext camelContext, InfinispanConfiguration configuration) {
+ this.camelContext = camelContext;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void start() throws Exception {
+ cacheContainer = configuration.getCacheContainer();
+ if (cacheContainer == null) {
+ String uri = configuration.getConfigurationUri();
+ if (uri != null && camelContext != null) {
+ uri = camelContext.resolvePropertyPlaceholders(uri);
+ }
+
+ ConfigurationBuilder configurationBuilder = new ConfigurationBuilder()
+ .classLoader(Thread.currentThread().getContextClassLoader());
+
+ if (uri != null) {
+ configurationBuilder.withProperties(InfinispanUtil.loadProperties(camelContext, uri));
+ }
+
+ cacheContainer = new RemoteCacheManager(
+ configurationBuilder
+ .addServers(configuration.getHost())
+ .build(),
+ true);
+
+ isManagedCacheContainer = true;
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (isManagedCacheContainer) {
+ cacheContainer.stop();
+ }
+ }
+
+ public BasicCacheContainer getCacheContainer() {
+ return cacheContainer;
+ }
+
+ public boolean isCacheContainerEmbedded() {
+ return InfinispanUtil.isEmbedded(cacheContainer);
+ }
+
+ public boolean isCacheContainerRemote() {
+ return InfinispanUtil.isRemote(cacheContainer);
+ }
+
+ public BasicCache<Object, Object> getCache() {
+ return getCache(configuration.getCacheName());
+ }
+
+ public BasicCache<Object, Object> getCache(Exchange exchange) {
+ return getCache(exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class));
+ }
+
+ public BasicCache<Object, Object> getCache(String cacheName) {
+ if (cacheName == null) {
+ cacheName = configuration.getCacheName();
+ }
+
+ LOGGER.trace("Cache[{}]", cacheName);
+
+ BasicCache<Object, Object> cache = InfinispanUtil.getCache(cacheContainer, cacheName);
+ if (configuration.hasFlags() && InfinispanUtil.isEmbedded(cache)) {
+ cache = new DecoratedCache(InfinispanUtil.asAdvanced(cache), configuration.getFlags());
+ }
+
+ return cache;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
index c54fd54..7da8552 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
@@ -20,6 +20,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.component.infinispan.remote.InfinispanRemoteOperation;
import org.apache.camel.util.ObjectHelper;
import org.infinispan.commons.api.BasicCache;
@@ -28,7 +29,7 @@ import org.infinispan.query.dsl.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.camel.component.infinispan.InfinispanUtil.isInHeaderEmpty;
+import static org.apache.camel.component.infinispan.InfinispanUtil.isHeaderEmpty;
public final class InfinispanOperation {
private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanOperation.class);
@@ -37,12 +38,20 @@ public final class InfinispanOperation {
}
public static void process(Exchange exchange, InfinispanConfiguration configuration, BasicCache<Object, Object> cache) {
- Operation operation = getOperation(exchange, configuration);
- operation.execute(configuration, cache, exchange);
+ final Message in = exchange.getIn();
+
+ Operation operation = getOperation(in, configuration);
+ operation.execute(
+ configuration,
+ exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES) != null
+ ? cache
+ : InfinispanUtil.ignoreReturnValuesCache(cache),
+ in
+ );
}
- private static Operation getOperation(Exchange exchange, InfinispanConfiguration configuration) {
- String operation = exchange.getIn().getHeader(InfinispanConstants.OPERATION, String.class);
+ private static Operation getOperation(Message message, InfinispanConfiguration configuration) {
+ String operation = message.getHeader(InfinispanConstants.OPERATION, String.class);
if (operation == null) {
if (configuration.hasCommand()) {
operation = InfinispanConstants.OPERATION + configuration.getCommand();
@@ -51,310 +60,306 @@ public final class InfinispanOperation {
}
}
LOGGER.trace("Operation: [{}]", operation);
- return Operation.valueOf(operation.substring(InfinispanConstants.OPERATION.length()).toUpperCase());
+ return Operation.fromOperation(operation);
}
private enum Operation {
PUT {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
Object result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- result = cache.put(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ result = cache.put(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.put(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.put(getKey(message), getValue(message), lifespan, timeUnit);
}
} else {
- result = cache.put(getKey(exchange), getValue(exchange));
+ result = cache.put(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, PUTASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
NotifyingFuture result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ result = cache.putAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.putAsync(getKey(message), getValue(message), lifespan, timeUnit);
}
} else {
- result = cache.putAsync(getKey(exchange), getValue(exchange));
+ result = cache.putAsync(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, PUTALL {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- cache.putAll(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ cache.putAll(getMap(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- cache.putAll(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ cache.putAll(getMap(message), lifespan, timeUnit);
}
} else {
- cache.putAll(getMap(exchange));
+ cache.putAll(getMap(message));
}
}
}, PUTALLASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
NotifyingFuture result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ result = cache.putAllAsync(getMap(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.putAllAsync(getMap(message), lifespan, timeUnit);
}
} else {
- result = cache.putAllAsync(getMap(exchange));
+ result = cache.putAllAsync(getMap(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, PUTIFABSENT {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
Object result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- result = cache.putIfAbsent(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ result = cache.putIfAbsent(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.putIfAbsent(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.putIfAbsent(getKey(message), getValue(message), lifespan, timeUnit);
}
} else {
- result = cache.putIfAbsent(getKey(exchange), getValue(exchange));
+ result = cache.putIfAbsent(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, PUTIFABSENTASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
NotifyingFuture result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ result = cache.putIfAbsentAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.putIfAbsentAsync(getKey(message), getValue(message), lifespan, timeUnit);
}
} else {
- result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange));
+ result = cache.putIfAbsentAsync(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, GET {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- setResult(cache.get(getKey(exchange)), exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ setResult(cache.get(getKey(message)), message);
}
}, CONTAINSKEY {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- setResult(cache.containsKey(getKey(exchange)), exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ setResult(cache.containsKey(getKey(message)), message);
}
}, CONTAINSVALUE {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- Object result = cache.containsValue(getValue(exchange));
- setResult(result, exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ setResult(cache.containsValue(getValue(message)), message);
}
}, REMOVE {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
Object result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (ObjectHelper.isEmpty(getValue(exchange))) {
- result = cache.remove(getKey(exchange));
+ if (ObjectHelper.isEmpty(getValue(message))) {
+ result = cache.remove(getKey(message));
} else {
- result = cache.remove(getKey(exchange), getValue(exchange));
+ result = cache.remove(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, REMOVEASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
NotifyingFuture result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (ObjectHelper.isEmpty(getValue(exchange))) {
- result = cache.removeAsync(getKey(exchange));
+ if (ObjectHelper.isEmpty(getValue(message))) {
+ result = cache.removeAsync(getKey(message));
} else {
- result = cache.removeAsync(getKey(exchange), getValue(exchange));
+ result = cache.removeAsync(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, REPLACE {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
Object result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replace(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ result = cache.replace(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
}
} else {
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replace(getKey(message), getValue(message), lifespan, timeUnit);
} else {
- result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.replace(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit);
}
}
} else {
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replace(getKey(exchange), getValue(exchange));
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replace(getKey(message), getValue(message));
} else {
- result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange));
+ result = cache.replace(getKey(message), getOldValue(message), getValue(message));
}
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, REPLACEASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
NotifyingFuture result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replaceAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
}
} else {
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replaceAsync(getKey(message), getValue(message), lifespan, timeUnit);
} else {
- result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit);
}
}
} else {
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replaceAsync(getKey(exchange), getValue(exchange));
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replaceAsync(getKey(message), getValue(message));
} else {
- result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange));
+ result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message));
}
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, SIZE {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- setResult(cache.size(), exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ setResult(cache.size(), message);
}
}, CLEAR {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
cache.clear();
}
}, CLEARASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- setResult(cache.clearAsync(), exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ setResult(cache.clearAsync(), message);
}
}, QUERY {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- Query query = getQuery(configuration, cache, exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ Query query = getQuery(configuration, cache, message);
if (query == null) {
return;
}
- setResult(query.list(), exchange);
+ setResult(query.list(), message);
}
};
- void setResult(Object result, Exchange exchange) {
- exchange.getIn().setHeader(InfinispanConstants.RESULT, result);
+ private static final Operation[] OPERATIONS = values();
+
+ void setResult(Object result, Message message) {
+ message.setHeader(InfinispanConstants.RESULT, result);
}
- Object getKey(Exchange exchange) {
- return exchange.getIn().getHeader(InfinispanConstants.KEY);
+ Object getKey(Message message) {
+ return message.getHeader(InfinispanConstants.KEY);
}
- Object getValue(Exchange exchange) {
- return exchange.getIn().getHeader(InfinispanConstants.VALUE);
+ Object getValue(Message message) {
+ return message.getHeader(InfinispanConstants.VALUE);
}
- Object getOldValue(Exchange exchange) {
- return exchange.getIn().getHeader(InfinispanConstants.OLD_VALUE);
+ Object getOldValue(Message message) {
+ return message.getHeader(InfinispanConstants.OLD_VALUE);
}
- Map<? extends Object, ? extends Object> getMap(Exchange exchange) {
- return (Map<? extends Object, ? extends Object>) exchange.getIn().getHeader(InfinispanConstants.MAP);
+ Map<? extends Object, ? extends Object> getMap(Message message) {
+ return (Map<? extends Object, ? extends Object>) message.getHeader(InfinispanConstants.MAP);
}
- Query getQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ Query getQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
if (InfinispanUtil.isRemote(cache)) {
- return InfinispanRemoteOperation.buildQuery(configuration, cache, exchange);
+ return InfinispanRemoteOperation.buildQuery(configuration, cache, message);
} else {
return null;
}
}
- abstract void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange);
+ boolean hasLifespan(Message message) {
+ return !isHeaderEmpty(message, InfinispanConstants.LIFESPAN_TIME)
+ && !isHeaderEmpty(message, InfinispanConstants.LIFESPAN_TIME_UNIT);
+ }
+
+ boolean hasMaxIdleTime(Message message) {
+ return !isHeaderEmpty(message, InfinispanConstants.MAX_IDLE_TIME)
+ && !isHeaderEmpty(message, InfinispanConstants.MAX_IDLE_TIME_UNIT);
+ }
+
+ abstract void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message);
+
+ public static Operation fromOperation(String operation) {
+ int len;
+ String name;
+
+ for (int i = OPERATIONS.length - 1; i >= 0; i--) {
+ name = OPERATIONS[i].name();
+ len = name.length();
+ if (len == operation.length() - InfinispanConstants.OPERATION_LEN) {
+ if (name.regionMatches(true, 0, operation, InfinispanConstants.OPERATION_LEN, len)) {
+ return OPERATIONS[i];
+ }
+ }
+ }
+
+ throw new IllegalArgumentException("Unknown Operation for string: " + operation);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
index 31e1e16..6fbd1c1 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
@@ -18,55 +18,31 @@ package org.apache.camel.component.infinispan;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
-import org.infinispan.client.hotrod.RemoteCacheManager;
-import org.infinispan.client.hotrod.configuration.Configuration;
-import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
-import org.infinispan.commons.api.BasicCache;
-import org.infinispan.commons.api.BasicCacheContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class InfinispanProducer extends DefaultProducer {
- private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class);
private final InfinispanConfiguration configuration;
- private BasicCacheContainer cacheContainer;
- private boolean isManagedCacheContainer;
+ private final InfinispanManager manager;
public InfinispanProducer(InfinispanEndpoint endpoint, InfinispanConfiguration configuration) {
super(endpoint);
this.configuration = configuration;
+ this.manager = new InfinispanManager(endpoint.getCamelContext(), configuration);
}
@Override
public void process(Exchange exchange) throws Exception {
- InfinispanOperation.process(exchange, configuration, getCache(exchange));
+ InfinispanOperation.process(exchange, configuration, manager.getCache(exchange));
}
@Override
protected void doStart() throws Exception {
- cacheContainer = configuration.getCacheContainer();
- if (cacheContainer == null) {
- Configuration config = new ConfigurationBuilder().classLoader(Thread.currentThread().getContextClassLoader()).addServers(configuration.getHost()).build();
- cacheContainer = new RemoteCacheManager(config, true);
- isManagedCacheContainer = true;
- }
super.doStart();
+ manager.start();
}
@Override
protected void doStop() throws Exception {
- if (isManagedCacheContainer) {
- cacheContainer.stop();
- }
+ manager.stop();
super.doStop();
}
-
- private BasicCache<Object, Object> getCache(Exchange exchange) {
- String cacheName = exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class);
- if (cacheName == null) {
- cacheName = configuration.getCacheName();
- }
- LOGGER.trace("Cache[{}]", cacheName);
- return cacheName != null ? cacheContainer.getCache(cacheName) : cacheContainer.getCache();
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
index 95a5d04..3bc6c2a 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
@@ -16,8 +16,17 @@
*/
package org.apache.camel.component.infinispan;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ResourceHelper;
+import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
@@ -68,10 +77,20 @@ public final class InfinispanUtil {
}
@SuppressWarnings("unchecked")
+ public static <K, V> Cache<K, V> asEmbedded(BasicCache<K, V> cache) {
+ return Cache.class.cast(cache);
+ }
+
+ @SuppressWarnings("unchecked")
public static <K, V> RemoteCache<K, V> asRemote(BasicCache<K, V> cache) {
return RemoteCache.class.cast(cache);
}
+ @SuppressWarnings("unchecked")
+ public static <K, V> AdvancedCache<K, V> asAdvanced(BasicCache<K, V> cache) {
+ return Cache.class.cast(cache).getAdvancedCache();
+ }
+
public static <K, V> BasicCache<K, V> ignoreReturnValuesCache(BasicCache<K, V> cache) {
if (isEmbedded(cache)) {
return ((Cache<K, V>) cache).getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES);
@@ -81,10 +100,36 @@ public final class InfinispanUtil {
}
public static boolean isInHeaderEmpty(Exchange exchange, String header) {
- return ObjectHelper.isEmpty(exchange.getIn().getHeader(header));
+ return isHeaderEmpty(exchange.getIn(), header);
+ }
+
+ public static boolean isHeaderEmpty(Message message, String header) {
+ return ObjectHelper.isEmpty(message.getHeader(header));
}
public static BasicCache<Object, Object> getCache(BasicCacheContainer cacheContainer, String cacheName) {
return ObjectHelper.isEmpty(cacheName) ? cacheContainer.getCache() : cacheContainer.getCache(cacheName);
}
+
+ public static Properties loadProperties(CamelContext camelContext, String uri) throws IOException {
+ if (camelContext != null) {
+ try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) {
+ Properties properties = new Properties();
+ properties.load(is);
+
+ return properties;
+ } catch (IOException e) {
+ }
+ }
+
+ try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(uri)) {
+ Properties properties = new Properties();
+ properties.load(is);
+
+ return properties;
+ } catch (IOException e) {
+ }
+
+ throw new FileNotFoundException("Cannot find resource: " + uri);
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
index 4f50950..0d6f630 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
@@ -22,7 +22,6 @@ import org.infinispan.notifications.Listener;
@Listener(clustered = false, sync = false)
public class InfinispanAsyncLocalEventListener extends InfinispanSyncLocalEventListener {
-
public InfinispanAsyncLocalEventListener(InfinispanConsumer consumer, Set<String> eventTypes) {
super(consumer, eventTypes);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
index 113c073..d4a70ca 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
@@ -20,6 +20,7 @@ import org.apache.camel.component.infinispan.InfinispanConfiguration;
import org.apache.camel.component.infinispan.InfinispanConsumer;
import org.apache.camel.component.infinispan.InfinispanConsumerHandler;
import org.apache.camel.component.infinispan.InfinispanEventListener;
+import org.apache.camel.component.infinispan.InfinispanUtil;
import org.infinispan.Cache;
public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsumerHandler {
@@ -30,7 +31,7 @@ public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsum
@Override
public InfinispanEventListener start(InfinispanConsumer consumer) {
- Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache();
+ Cache<?, ?> embeddedCache = InfinispanUtil.asEmbedded(consumer.getCache());
InfinispanConfiguration configuration = consumer.getConfiguration();
InfinispanEventListener listener;
if (configuration.hasCustomListener()) {
@@ -55,8 +56,7 @@ public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsum
@Override
public void stop(InfinispanConsumer consumer) {
- Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache();
+ Cache<?, ?> embeddedCache = InfinispanUtil.asEmbedded(consumer.getCache());
embeddedCache.removeListener(consumer.getListener());
}
-
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
index bcf6957..14fac43 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.infinispan.embedded;
import java.util.Set;
+
import org.apache.camel.component.infinispan.InfinispanConsumer;
import org.apache.camel.component.infinispan.InfinispanEventListener;
import org.infinispan.notifications.Listener;
@@ -25,16 +26,12 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Listener(clustered = true, sync = true)
public class InfinispanSyncClusteredEventListener extends InfinispanEventListener {
- /*clustered listeners only listen for post events*/
+ // clustered listeners only listen for post events
private static final boolean IS_PRE = false;
- private final transient Logger logger = LoggerFactory.getLogger(this.getClass());
-
public InfinispanSyncClusteredEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) {
super(infinispanConsumer, eventTypes);
}
@@ -44,8 +41,6 @@ public class InfinispanSyncClusteredEventListener extends InfinispanEventListene
@CacheEntryRemoved
@CacheEntryExpired
public void processEvent(CacheEntryEvent<Object, Object> event) {
- logger.trace("Received CacheEntryEvent [{}]", event);
-
if (isAccepted(event.getType().toString())) {
infinispanConsumer.processEvent(event.getType().toString(), IS_PRE, event.getCache().getName(), event.getKey());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
index 1cf9325..e8112bd 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.infinispan.embedded;
import java.util.Set;
+
import org.apache.camel.component.infinispan.InfinispanConsumer;
import org.apache.camel.component.infinispan.InfinispanEventListener;
import org.infinispan.notifications.Listener;
@@ -30,13 +31,9 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivate
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Listener(clustered = false, sync = true)
public class InfinispanSyncLocalEventListener extends InfinispanEventListener {
- private final transient Logger logger = LoggerFactory.getLogger(this.getClass());
-
public InfinispanSyncLocalEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) {
super(infinispanConsumer, eventTypes);
}
@@ -51,8 +48,6 @@ public class InfinispanSyncLocalEventListener extends InfinispanEventListener {
@CacheEntryVisited
@CacheEntryExpired
public void processEvent(CacheEntryEvent<Object, Object> event) {
- logger.trace("Received CacheEntryEvent [{}]", event);
-
dispatch(event.getType().toString(), event.isPre(), event.getCache().getName(), event.getKey());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
index e7838b3..b28a460 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
@@ -29,17 +29,19 @@ import org.infinispan.manager.DefaultCacheManager;
public class InfinispanIdempotentRepository extends ServiceSupport implements IdempotentRepository<Object> {
private final String cacheName;
private final BasicCacheContainer cacheContainer;
- private boolean isManagedCacheContainer;
+ private final boolean isManagedCacheContainer;
+ private BasicCache<Object, Boolean> cache;
public InfinispanIdempotentRepository(BasicCacheContainer cacheContainer, String cacheName) {
this.cacheContainer = cacheContainer;
this.cacheName = cacheName;
+ this.isManagedCacheContainer = false;
}
public InfinispanIdempotentRepository(String cacheName) {
- cacheContainer = new DefaultCacheManager();
+ this.cacheContainer = new DefaultCacheManager();
this.cacheName = cacheName;
- isManagedCacheContainer = true;
+ this.isManagedCacheContainer = true;
}
public InfinispanIdempotentRepository() {
@@ -106,16 +108,21 @@ public class InfinispanIdempotentRepository extends ServiceSupport implements Id
@Override
protected void doShutdown() throws Exception {
- super.doShutdown();
if (isManagedCacheContainer) {
cacheContainer.stop();
}
+
+ super.doShutdown();
}
private BasicCache<Object, Boolean> getCache() {
- return cacheName != null
+ if (cache == null) {
+ cache = cacheName != null
? cacheContainer.<Object, Boolean>getCache(cacheName)
: cacheContainer.<Object, Boolean>getCache();
+ }
+
+ return cache;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
index dd0f590..42b3cec 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
@@ -16,7 +16,7 @@
*/
package org.apache.camel.component.infinispan.remote;
-import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.component.infinispan.InfinispanConfiguration;
import org.apache.camel.component.infinispan.InfinispanConstants;
import org.apache.camel.component.infinispan.InfinispanQueryBuilder;
@@ -30,8 +30,8 @@ public final class InfinispanRemoteOperation {
private InfinispanRemoteOperation() {
}
- public static Query buildQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- InfinispanQueryBuilder queryBuilder = exchange.getIn().getHeader(InfinispanConstants.QUERY_BUILDER, InfinispanQueryBuilder.class);
+ public static Query buildQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ InfinispanQueryBuilder queryBuilder = message.getHeader(InfinispanConstants.QUERY_BUILDER, InfinispanQueryBuilder.class);
if (queryBuilder == null) {
queryBuilder = configuration.getQueryBuilder();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
index b47b1db..747a9e4 100644
--- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.infinispan;
import java.util.List;
+
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.infinispan.Cache;
@@ -90,15 +91,18 @@ public class InfinispanClusterTestSupport extends CamelTestSupport {
} catch (Throwable ex) {
throw new Exception(ex);
}
+
super.setUp();
}
@Override
public void tearDown() throws Exception {
+ super.tearDown();
+
+ // Has to be done later, maybe CamelTestSupport should
for (BasicCacheContainer container: clusteredCacheContainers) {
container.stop();
}
- super.tearDown();
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
new file mode 100644
index 0000000..48c7772
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import org.infinispan.cache.impl.DecoratedCache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.manager.DefaultCacheManager;
+import org.jgroups.util.UUID;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class InfinispanConfigurationTestIT {
+
+ @Test
+ public void embeddedCacheWithFlagsTest() throws Exception {
+ InfinispanConfiguration configuration = new InfinispanConfiguration();
+ configuration.setHost("localhost");
+ configuration.setCacheName("misc_cache");
+ configuration.setCacheContainer(new DefaultCacheManager(true));
+ configuration.setFlags(
+ org.infinispan.context.Flag.SKIP_CACHE_LOAD,
+ org.infinispan.context.Flag.SKIP_CACHE_STORE
+ );
+
+ InfinispanManager manager = new InfinispanManager(configuration);
+ manager.start();
+
+ BasicCache<Object, Object> cache = manager.getCache();
+ assertNotNull(cache);
+ assertTrue(cache instanceof DecoratedCache);
+
+ DecoratedCache<Object, Object> decoratedCache = (DecoratedCache<Object, Object>)cache;
+ assertTrue(decoratedCache.getFlags().contains(org.infinispan.context.Flag.SKIP_CACHE_LOAD));
+ assertTrue(decoratedCache.getFlags().contains(org.infinispan.context.Flag.SKIP_CACHE_STORE));
+
+ manager.getCacheContainer().stop();
+ manager.stop();
+ }
+
+ @Test
+ public void remoteCacheWithoutProperties() throws Exception {
+ InfinispanConfiguration configuration = new InfinispanConfiguration();
+ configuration.setHost("localhost");
+ configuration.setCacheName("misc_cache");
+
+ InfinispanManager manager = new InfinispanManager(configuration);
+ manager.start();
+
+ BasicCache<Object, Object> cache = manager.getCache();
+ assertNotNull(cache);
+ assertTrue(cache instanceof RemoteCache);
+
+ RemoteCache<Object, Object> remoteCache = InfinispanUtil.asRemote(cache);
+
+ String key = UUID.randomUUID().toString();
+ assertNull(remoteCache.put(key, "val1"));
+ assertNull(remoteCache.put(key, "val2"));
+
+ manager.stop();
+ }
+
+ @Test
+ public void remoteCacheWithPropertiesTest() throws Exception {
+ InfinispanConfiguration configuration = new InfinispanConfiguration();
+ configuration.setHost("localhost");
+ configuration.setCacheName("misc_cache");
+ configuration.setConfigurationUri("infinispan/client.properties");
+
+ InfinispanManager manager = new InfinispanManager(configuration);
+ manager.start();
+
+ BasicCache<Object, Object> cache = manager.getCache();
+ assertNotNull(cache);
+ assertTrue(cache instanceof RemoteCache);
+
+ String key = UUID.randomUUID().toString();
+ assertNull(cache.put(key, "val1"));
+ assertNotNull(cache.put(key, "val2"));
+
+ manager.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/test/resources/infinispan/client.properties
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/resources/infinispan/client.properties b/components/camel-infinispan/src/test/resources/infinispan/client.properties
new file mode 100644
index 0000000..4cacbe2
--- /dev/null
+++ b/components/camel-infinispan/src/test/resources/infinispan/client.properties
@@ -0,0 +1,20 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+# Whether or not to implicitly Flag.FORCE_RETURN_VALUE for all calls.
+# The default value for this property is false.
+infinispan.client.hotrod.force_return_values = true
http://git-wip-us.apache.org/repos/asf/camel/blob/aaaea389/components/camel-infinispan/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/resources/log4j.xml b/components/camel-infinispan/src/test/resources/log4j.xml
new file mode 100644
index 0000000..71a2b38
--- /dev/null
+++ b/components/camel-infinispan/src/test/resources/log4j.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <appender name="FILE" class="org.apache.log4j.FileAppender">
+ <param name="File" value="target/camel-infinispan-test.log" />
+ <param name="Append" value="true" />
+ <param name="Threshold" value="INFO" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d [%-15.15t] %-5p %-30.30c{1} - %m%n" />
+ </layout>
+ </appender>
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Threshold" value="INFO" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%30.30t] %-30.30c{1} %-5p %m%n" />
+ </layout>
+ </appender>
+
+ <!-- ================ -->
+ <!-- Limit categories -->
+ <!-- ================ -->
+ <category name="org.jboss.arquillian">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.infinispan.arquillian.core.WithRunningServerObserver">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.infinispan.server.test.util.TestsuiteListener">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.apache.http">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.infinispan">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.jgroups">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.apache.commons.httpclient.auth">
+ <priority value="INFO"/>
+ </category>
+
+ <category name="org.apache.auth">
+ <priority value="INFO"/>
+ </category>
+
+ <category name="org.apache.directory">
+ <priority value="WARN"/>
+ </category>
+
+ <!-- ======================= -->
+ <!-- Setup the Root category -->
+ <!-- ======================= -->
+
+ <root>
+ <priority value="INFO" />
+ <appender-ref ref="FILE" />
+ </root>
+
+</log4j:configuration>
[2/2] camel git commit: CAMEL-9740 : Improve camel-infinispan
Posted by da...@apache.org.
CAMEL-9740 : Improve camel-infinispan
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0de49d45
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0de49d45
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0de49d45
Branch: refs/heads/camel-2.17.x
Commit: 0de49d45a4fe11f4d6ec7022208b306f24fc8905
Parents: 2363157
Author: lburgazzoli <lb...@gmail.com>
Authored: Fri Mar 11 10:48:26 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 23 09:58:54 2016 +0100
----------------------------------------------------------------------
components/camel-infinispan/pom.xml | 2 +
.../infinispan/InfinispanConfiguration.java | 44 ++-
.../infinispan/InfinispanConstants.java | 2 +
.../infinispan/InfinispanConsumer.java | 19 +-
.../component/infinispan/InfinispanManager.java | 116 ++++++
.../infinispan/InfinispanOperation.java | 355 ++++++++++---------
.../infinispan/InfinispanProducer.java | 34 +-
.../component/infinispan/InfinispanUtil.java | 47 ++-
.../InfinispanAsyncLocalEventListener.java | 1 -
.../InfinispanConsumerEmbeddedHandler.java | 6 +-
.../InfinispanSyncClusteredEventListener.java | 9 +-
.../InfinispanSyncLocalEventListener.java | 7 +-
.../InfinispanIdempotentRepository.java | 17 +-
.../remote/InfinispanRemoteOperation.java | 6 +-
.../InfinispanClusterTestSupport.java | 6 +-
.../InfinispanConfigurationTestIT.java | 100 ++++++
.../test/resources/infinispan/client.properties | 20 ++
.../src/test/resources/log4j.xml | 70 ++++
18 files changed, 621 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/pom.xml b/components/camel-infinispan/pom.xml
index e7eb14a..ff4eb99 100644
--- a/components/camel-infinispan/pom.xml
+++ b/components/camel-infinispan/pom.xml
@@ -254,6 +254,8 @@
<command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=static_filter_factory:add(configuration=default)</command>
<!-- Separate cache for @ClientListener(includeCurrentState=true) -->
<command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=include_current_state:add(configuration=default)</command>
+ <!-- Separate cache for protobuf serialized objects. -->
+ <command>/subsystem=datagrid-infinispan/cache-container=local/local-cache=misc_cache:add(configuration=default)</command>
</commands>
</executeCommands>
</configuration>
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
index 473a267..0559f84 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
import org.apache.camel.util.ObjectHelper;
import org.infinispan.commons.api.BasicCacheContainer;
+import org.infinispan.context.Flag;
@UriParams
public class InfinispanConfiguration {
@@ -55,6 +56,11 @@ public class InfinispanConfiguration {
private boolean clusteredListener;
@UriParam
private InfinispanQueryBuilder queryBuilder;
+ @UriParam(label = "advanced", javaType = "java.lang.String")
+ private Flag[] flags;
+ @UriParam(label = "advanced")
+ private String configurationUri;
+
public String getCommand() {
return command;
@@ -149,7 +155,7 @@ public class InfinispanConfiguration {
* TRANSACTION_REGISTERED, CACHE_ENTRY_INVALIDATED, DATA_REHASHED, TOPOLOGY_CHANGED, PARTITION_STATUS_CHANGED
*/
public void setEventTypes(String eventTypes) {
- this.eventTypes = new HashSet<String>(Arrays.asList(eventTypes.split(",")));
+ this.eventTypes = new HashSet<>(Arrays.asList(eventTypes.split(",")));
}
/**
@@ -181,4 +187,40 @@ public class InfinispanConfiguration {
public boolean hasQueryBuilder() {
return queryBuilder != null;
}
+
+ public Flag[] getFlags() {
+ return flags;
+ }
+
+ /**
+ * A comma separated list of Flag to be applied by default on each cache
+ * invocation, not applicable to remote caches.
+ */
+ public void setFlags(String flagsAsString) {
+ String[] flagsArray = flagsAsString.split(",");
+ this.flags = new Flag[flagsArray.length];
+
+ for (int i = 0; i < flagsArray.length; i++) {
+ this.flags[i] = Flag.valueOf(flagsArray[i]);
+ }
+ }
+
+ public void setFlags(Flag... flags) {
+ this.flags = flags;
+ }
+
+ public boolean hasFlags() {
+ return flags != null && flags.length > 0;
+ }
+
+ /**
+ * An implementation specific URI for the CacheManager
+ */
+ public String getConfigurationUri() {
+ return configurationUri;
+ }
+
+ public void setConfigurationUri(String configurationUri) {
+ this.configurationUri = configurationUri;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
index 1a27fd7..d791968 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.infinispan;
public interface InfinispanConstants {
+ int OPERATION_LEN = InfinispanConstants.OPERATION.length();
+
String EVENT_TYPE = "CamelInfinispanEventType";
String IS_PRE = "CamelInfinispanIsPre";
String CACHE_NAME = "CamelInfinispanCacheName";
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
index b4db422..e60156a 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
@@ -25,7 +25,6 @@ import org.apache.camel.impl.DefaultConsumer;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.Search;
import org.infinispan.commons.api.BasicCache;
-import org.infinispan.commons.api.BasicCacheContainer;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.dsl.Query;
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
public class InfinispanConsumer extends DefaultConsumer {
private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class);
private final InfinispanConfiguration configuration;
+ private final InfinispanManager manager;
private InfinispanEventListener listener;
private InfinispanConsumerHandler consumerHandler;
private BasicCache<Object, Object> cache;
@@ -43,6 +43,7 @@ public class InfinispanConsumer extends DefaultConsumer {
public InfinispanConsumer(InfinispanEndpoint endpoint, Processor processor, InfinispanConfiguration configuration) {
super(endpoint, processor);
this.configuration = configuration;
+ this.manager = new InfinispanManager(endpoint.getCamelContext(), configuration);
}
public void processEvent(String eventType, boolean isPre, String cacheName, Object key) {
@@ -69,10 +70,9 @@ public class InfinispanConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
+ manager.start();
- BasicCacheContainer cacheContainer = configuration.getCacheContainer();
- cache = InfinispanUtil.getCache(configuration.getCacheContainer(), configuration.getCacheName());
-
+ cache = manager.getCache();
if (configuration.hasQueryBuilder()) {
if (InfinispanUtil.isRemote(cache)) {
RemoteCache<Object, Object> remoteCache = InfinispanUtil.asRemote(cache);
@@ -81,15 +81,17 @@ public class InfinispanConsumer extends DefaultConsumer {
continuousQuery = Search.getContinuousQuery(remoteCache);
continuousQuery.addContinuousQueryListener(query, new ContinuousQueryEventListener(cache.getName()));
} else {
- throw new IllegalArgumentException("Can't run continuous queries against embedded cache (" + cache.getName() + ")");
+ throw new IllegalArgumentException(
+ "Can't run continuous queries against embedded cache (" + cache.getName() + ")");
}
} else {
- if (InfinispanUtil.isEmbedded(cacheContainer)) {
+ if (manager.isCacheContainerEmbedded()) {
consumerHandler = InfinispanConsumerEmbeddedHandler.INSTANCE;
- } else if (InfinispanUtil.isRemote(cacheContainer)) {
+ } else if (manager.isCacheContainerRemote()) {
consumerHandler = InfinispanConsumerRemoteHandler.INSTANCE;
} else {
- throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + cacheContainer);
+ throw new UnsupportedOperationException(
+ "Unsupported CacheContainer type " + manager.getCacheContainer().getClass().getName());
}
listener = consumerHandler.start(this);
@@ -106,6 +108,7 @@ public class InfinispanConsumer extends DefaultConsumer {
consumerHandler.stop(this);
}
+ manager.stop();
super.doStop();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
new file mode 100644
index 0000000..eab846b
--- /dev/null
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Service;
+import org.infinispan.cache.impl.DecoratedCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InfinispanManager implements Service {
+ private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanManager.class);
+
+ private final InfinispanConfiguration configuration;
+ private final CamelContext camelContext;
+ private BasicCacheContainer cacheContainer;
+ private boolean isManagedCacheContainer;
+
+
+ public InfinispanManager(InfinispanConfiguration configuration) {
+ this(null, configuration);
+ }
+
+ public InfinispanManager(CamelContext camelContext, InfinispanConfiguration configuration) {
+ this.camelContext = camelContext;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void start() throws Exception {
+ cacheContainer = configuration.getCacheContainer();
+ if (cacheContainer == null) {
+ String uri = configuration.getConfigurationUri();
+ if (uri != null && camelContext != null) {
+ uri = camelContext.resolvePropertyPlaceholders(uri);
+ }
+
+ ConfigurationBuilder configurationBuilder = new ConfigurationBuilder()
+ .classLoader(Thread.currentThread().getContextClassLoader());
+
+ if (uri != null) {
+ configurationBuilder.withProperties(InfinispanUtil.loadProperties(camelContext, uri));
+ }
+
+ cacheContainer = new RemoteCacheManager(
+ configurationBuilder
+ .addServers(configuration.getHost())
+ .build(),
+ true);
+
+ isManagedCacheContainer = true;
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (isManagedCacheContainer) {
+ cacheContainer.stop();
+ }
+ }
+
+ public BasicCacheContainer getCacheContainer() {
+ return cacheContainer;
+ }
+
+ public boolean isCacheContainerEmbedded() {
+ return InfinispanUtil.isEmbedded(cacheContainer);
+ }
+
+ public boolean isCacheContainerRemote() {
+ return InfinispanUtil.isRemote(cacheContainer);
+ }
+
+ public BasicCache<Object, Object> getCache() {
+ return getCache(configuration.getCacheName());
+ }
+
+ public BasicCache<Object, Object> getCache(Exchange exchange) {
+ return getCache(exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class));
+ }
+
+ public BasicCache<Object, Object> getCache(String cacheName) {
+ if (cacheName == null) {
+ cacheName = configuration.getCacheName();
+ }
+
+ LOGGER.trace("Cache[{}]", cacheName);
+
+ BasicCache<Object, Object> cache = InfinispanUtil.getCache(cacheContainer, cacheName);
+ if (configuration.hasFlags() && InfinispanUtil.isEmbedded(cache)) {
+ cache = new DecoratedCache(InfinispanUtil.asAdvanced(cache), configuration.getFlags());
+ }
+
+ return cache;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
index c54fd54..7da8552 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanOperation.java
@@ -20,6 +20,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.component.infinispan.remote.InfinispanRemoteOperation;
import org.apache.camel.util.ObjectHelper;
import org.infinispan.commons.api.BasicCache;
@@ -28,7 +29,7 @@ import org.infinispan.query.dsl.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.camel.component.infinispan.InfinispanUtil.isInHeaderEmpty;
+import static org.apache.camel.component.infinispan.InfinispanUtil.isHeaderEmpty;
public final class InfinispanOperation {
private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanOperation.class);
@@ -37,12 +38,20 @@ public final class InfinispanOperation {
}
public static void process(Exchange exchange, InfinispanConfiguration configuration, BasicCache<Object, Object> cache) {
- Operation operation = getOperation(exchange, configuration);
- operation.execute(configuration, cache, exchange);
+ final Message in = exchange.getIn();
+
+ Operation operation = getOperation(in, configuration);
+ operation.execute(
+ configuration,
+ exchange.getIn().getHeader(InfinispanConstants.IGNORE_RETURN_VALUES) != null
+ ? cache
+ : InfinispanUtil.ignoreReturnValuesCache(cache),
+ in
+ );
}
- private static Operation getOperation(Exchange exchange, InfinispanConfiguration configuration) {
- String operation = exchange.getIn().getHeader(InfinispanConstants.OPERATION, String.class);
+ private static Operation getOperation(Message message, InfinispanConfiguration configuration) {
+ String operation = message.getHeader(InfinispanConstants.OPERATION, String.class);
if (operation == null) {
if (configuration.hasCommand()) {
operation = InfinispanConstants.OPERATION + configuration.getCommand();
@@ -51,310 +60,306 @@ public final class InfinispanOperation {
}
}
LOGGER.trace("Operation: [{}]", operation);
- return Operation.valueOf(operation.substring(InfinispanConstants.OPERATION.length()).toUpperCase());
+ return Operation.fromOperation(operation);
}
private enum Operation {
PUT {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
Object result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- result = cache.put(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ result = cache.put(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.put(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.put(getKey(message), getValue(message), lifespan, timeUnit);
}
} else {
- result = cache.put(getKey(exchange), getValue(exchange));
+ result = cache.put(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, PUTASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
NotifyingFuture result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ result = cache.putAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.putAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.putAsync(getKey(message), getValue(message), lifespan, timeUnit);
}
} else {
- result = cache.putAsync(getKey(exchange), getValue(exchange));
+ result = cache.putAsync(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, PUTALL {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- cache.putAll(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ cache.putAll(getMap(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- cache.putAll(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ cache.putAll(getMap(message), lifespan, timeUnit);
}
} else {
- cache.putAll(getMap(exchange));
+ cache.putAll(getMap(message));
}
}
}, PUTALLASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
NotifyingFuture result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ result = cache.putAllAsync(getMap(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.putAllAsync(getMap(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.putAllAsync(getMap(message), lifespan, timeUnit);
}
} else {
- result = cache.putAllAsync(getMap(exchange));
+ result = cache.putAllAsync(getMap(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, PUTIFABSENT {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
Object result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- result = cache.putIfAbsent(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ result = cache.putIfAbsent(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.putIfAbsent(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.putIfAbsent(getKey(message), getValue(message), lifespan, timeUnit);
}
} else {
- result = cache.putIfAbsent(getKey(exchange), getValue(exchange));
+ result = cache.putIfAbsent(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, PUTIFABSENTASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
NotifyingFuture result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ result = cache.putIfAbsentAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.putIfAbsentAsync(getKey(message), getValue(message), lifespan, timeUnit);
}
} else {
- result = cache.putIfAbsentAsync(getKey(exchange), getValue(exchange));
+ result = cache.putIfAbsentAsync(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, GET {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- setResult(cache.get(getKey(exchange)), exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ setResult(cache.get(getKey(message)), message);
}
}, CONTAINSKEY {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- setResult(cache.containsKey(getKey(exchange)), exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ setResult(cache.containsKey(getKey(message)), message);
}
}, CONTAINSVALUE {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- Object result = cache.containsValue(getValue(exchange));
- setResult(result, exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ setResult(cache.containsValue(getValue(message)), message);
}
}, REMOVE {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
Object result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (ObjectHelper.isEmpty(getValue(exchange))) {
- result = cache.remove(getKey(exchange));
+ if (ObjectHelper.isEmpty(getValue(message))) {
+ result = cache.remove(getKey(message));
} else {
- result = cache.remove(getKey(exchange), getValue(exchange));
+ result = cache.remove(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, REMOVEASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
NotifyingFuture result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (ObjectHelper.isEmpty(getValue(exchange))) {
- result = cache.removeAsync(getKey(exchange));
+ if (ObjectHelper.isEmpty(getValue(message))) {
+ result = cache.removeAsync(getKey(message));
} else {
- result = cache.removeAsync(getKey(exchange), getValue(exchange));
+ result = cache.removeAsync(getKey(message), getValue(message));
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, REPLACE {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
Object result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replace(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ result = cache.replace(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
}
} else {
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replace(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replace(getKey(message), getValue(message), lifespan, timeUnit);
} else {
- result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.replace(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit);
}
}
} else {
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replace(getKey(exchange), getValue(exchange));
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replace(getKey(message), getValue(message));
} else {
- result = cache.replace(getKey(exchange), getOldValue(exchange), getValue(exchange));
+ result = cache.replace(getKey(message), getOldValue(message), getValue(message));
}
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, REPLACEASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
NotifyingFuture result;
- if (!isInHeaderEmpty(exchange, InfinispanConstants.IGNORE_RETURN_VALUES)) {
- cache = InfinispanUtil.ignoreReturnValuesCache(cache);
- }
- if (!isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME) && !isInHeaderEmpty(exchange, InfinispanConstants.LIFESPAN_TIME_UNIT)) {
- long lifespan = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
- String timeUnit = exchange.getIn().getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, String.class);
- if (!isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME)
- && !isInHeaderEmpty(exchange, InfinispanConstants.MAX_IDLE_TIME_UNIT)) {
- long maxIdle = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
- String maxIdleTimeUnit = exchange.getIn().getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, String.class);
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ if (hasLifespan(message)) {
+ long lifespan = message.getHeader(InfinispanConstants.LIFESPAN_TIME, long.class);
+ TimeUnit timeUnit = message.getHeader(InfinispanConstants.LIFESPAN_TIME_UNIT, TimeUnit.class);
+ if (hasMaxIdleTime(message)) {
+ long maxIdle = message.getHeader(InfinispanConstants.MAX_IDLE_TIME, long.class);
+ TimeUnit maxIdleTimeUnit = message.getHeader(InfinispanConstants.MAX_IDLE_TIME_UNIT, TimeUnit.class);
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replaceAsync(getKey(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
} else {
- result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit), maxIdle, TimeUnit.valueOf(maxIdleTimeUnit));
+ result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit, maxIdle, maxIdleTimeUnit);
}
} else {
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replaceAsync(getKey(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replaceAsync(getKey(message), getValue(message), lifespan, timeUnit);
} else {
- result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange), lifespan, TimeUnit.valueOf(timeUnit));
+ result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message), lifespan, timeUnit);
}
}
} else {
- if (ObjectHelper.isEmpty(getOldValue(exchange))) {
- result = cache.replaceAsync(getKey(exchange), getValue(exchange));
+ if (ObjectHelper.isEmpty(getOldValue(message))) {
+ result = cache.replaceAsync(getKey(message), getValue(message));
} else {
- result = cache.replaceAsync(getKey(exchange), getOldValue(exchange), getValue(exchange));
+ result = cache.replaceAsync(getKey(message), getOldValue(message), getValue(message));
}
}
- setResult(result, exchange);
+ setResult(result, message);
}
}, SIZE {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- setResult(cache.size(), exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ setResult(cache.size(), message);
}
}, CLEAR {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
cache.clear();
}
}, CLEARASYNC {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- setResult(cache.clearAsync(), exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ setResult(cache.clearAsync(), message);
}
}, QUERY {
@Override
- void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- Query query = getQuery(configuration, cache, exchange);
+ void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ Query query = getQuery(configuration, cache, message);
if (query == null) {
return;
}
- setResult(query.list(), exchange);
+ setResult(query.list(), message);
}
};
- void setResult(Object result, Exchange exchange) {
- exchange.getIn().setHeader(InfinispanConstants.RESULT, result);
+ private static final Operation[] OPERATIONS = values();
+
+ void setResult(Object result, Message message) {
+ message.setHeader(InfinispanConstants.RESULT, result);
}
- Object getKey(Exchange exchange) {
- return exchange.getIn().getHeader(InfinispanConstants.KEY);
+ Object getKey(Message message) {
+ return message.getHeader(InfinispanConstants.KEY);
}
- Object getValue(Exchange exchange) {
- return exchange.getIn().getHeader(InfinispanConstants.VALUE);
+ Object getValue(Message message) {
+ return message.getHeader(InfinispanConstants.VALUE);
}
- Object getOldValue(Exchange exchange) {
- return exchange.getIn().getHeader(InfinispanConstants.OLD_VALUE);
+ Object getOldValue(Message message) {
+ return message.getHeader(InfinispanConstants.OLD_VALUE);
}
- Map<? extends Object, ? extends Object> getMap(Exchange exchange) {
- return (Map<? extends Object, ? extends Object>) exchange.getIn().getHeader(InfinispanConstants.MAP);
+ Map<? extends Object, ? extends Object> getMap(Message message) {
+ return (Map<? extends Object, ? extends Object>) message.getHeader(InfinispanConstants.MAP);
}
- Query getQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
+ Query getQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
if (InfinispanUtil.isRemote(cache)) {
- return InfinispanRemoteOperation.buildQuery(configuration, cache, exchange);
+ return InfinispanRemoteOperation.buildQuery(configuration, cache, message);
} else {
return null;
}
}
- abstract void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange);
+ boolean hasLifespan(Message message) {
+ return !isHeaderEmpty(message, InfinispanConstants.LIFESPAN_TIME)
+ && !isHeaderEmpty(message, InfinispanConstants.LIFESPAN_TIME_UNIT);
+ }
+
+ boolean hasMaxIdleTime(Message message) {
+ return !isHeaderEmpty(message, InfinispanConstants.MAX_IDLE_TIME)
+ && !isHeaderEmpty(message, InfinispanConstants.MAX_IDLE_TIME_UNIT);
+ }
+
+ abstract void execute(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message);
+
+ public static Operation fromOperation(String operation) {
+ int len;
+ String name;
+
+ for (int i = OPERATIONS.length - 1; i >= 0; i--) {
+ name = OPERATIONS[i].name();
+ len = name.length();
+ if (len == operation.length() - InfinispanConstants.OPERATION_LEN) {
+ if (name.regionMatches(true, 0, operation, InfinispanConstants.OPERATION_LEN, len)) {
+ return OPERATIONS[i];
+ }
+ }
+ }
+
+ throw new IllegalArgumentException("Unknown Operation for string: " + operation);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
index 31e1e16..6fbd1c1 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanProducer.java
@@ -18,55 +18,31 @@ package org.apache.camel.component.infinispan;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
-import org.infinispan.client.hotrod.RemoteCacheManager;
-import org.infinispan.client.hotrod.configuration.Configuration;
-import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
-import org.infinispan.commons.api.BasicCache;
-import org.infinispan.commons.api.BasicCacheContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class InfinispanProducer extends DefaultProducer {
- private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class);
private final InfinispanConfiguration configuration;
- private BasicCacheContainer cacheContainer;
- private boolean isManagedCacheContainer;
+ private final InfinispanManager manager;
public InfinispanProducer(InfinispanEndpoint endpoint, InfinispanConfiguration configuration) {
super(endpoint);
this.configuration = configuration;
+ this.manager = new InfinispanManager(endpoint.getCamelContext(), configuration);
}
@Override
public void process(Exchange exchange) throws Exception {
- InfinispanOperation.process(exchange, configuration, getCache(exchange));
+ InfinispanOperation.process(exchange, configuration, manager.getCache(exchange));
}
@Override
protected void doStart() throws Exception {
- cacheContainer = configuration.getCacheContainer();
- if (cacheContainer == null) {
- Configuration config = new ConfigurationBuilder().classLoader(Thread.currentThread().getContextClassLoader()).addServers(configuration.getHost()).build();
- cacheContainer = new RemoteCacheManager(config, true);
- isManagedCacheContainer = true;
- }
super.doStart();
+ manager.start();
}
@Override
protected void doStop() throws Exception {
- if (isManagedCacheContainer) {
- cacheContainer.stop();
- }
+ manager.stop();
super.doStop();
}
-
- private BasicCache<Object, Object> getCache(Exchange exchange) {
- String cacheName = exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class);
- if (cacheName == null) {
- cacheName = configuration.getCacheName();
- }
- LOGGER.trace("Cache[{}]", cacheName);
- return cacheName != null ? cacheContainer.getCache(cacheName) : cacheContainer.getCache();
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
index 95a5d04..3bc6c2a 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
@@ -16,8 +16,17 @@
*/
package org.apache.camel.component.infinispan;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ResourceHelper;
+import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
@@ -68,10 +77,20 @@ public final class InfinispanUtil {
}
@SuppressWarnings("unchecked")
+ public static <K, V> Cache<K, V> asEmbedded(BasicCache<K, V> cache) {
+ return Cache.class.cast(cache);
+ }
+
+ @SuppressWarnings("unchecked")
public static <K, V> RemoteCache<K, V> asRemote(BasicCache<K, V> cache) {
return RemoteCache.class.cast(cache);
}
+ @SuppressWarnings("unchecked")
+ public static <K, V> AdvancedCache<K, V> asAdvanced(BasicCache<K, V> cache) {
+ return Cache.class.cast(cache).getAdvancedCache();
+ }
+
public static <K, V> BasicCache<K, V> ignoreReturnValuesCache(BasicCache<K, V> cache) {
if (isEmbedded(cache)) {
return ((Cache<K, V>) cache).getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES);
@@ -81,10 +100,36 @@ public final class InfinispanUtil {
}
public static boolean isInHeaderEmpty(Exchange exchange, String header) {
- return ObjectHelper.isEmpty(exchange.getIn().getHeader(header));
+ return isHeaderEmpty(exchange.getIn(), header);
+ }
+
+ public static boolean isHeaderEmpty(Message message, String header) {
+ return ObjectHelper.isEmpty(message.getHeader(header));
}
public static BasicCache<Object, Object> getCache(BasicCacheContainer cacheContainer, String cacheName) {
return ObjectHelper.isEmpty(cacheName) ? cacheContainer.getCache() : cacheContainer.getCache(cacheName);
}
+
+ public static Properties loadProperties(CamelContext camelContext, String uri) throws IOException {
+ if (camelContext != null) {
+ try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) {
+ Properties properties = new Properties();
+ properties.load(is);
+
+ return properties;
+ } catch (IOException e) {
+ }
+ }
+
+ try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(uri)) {
+ Properties properties = new Properties();
+ properties.load(is);
+
+ return properties;
+ } catch (IOException e) {
+ }
+
+ throw new FileNotFoundException("Cannot find resource: " + uri);
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
index 4f50950..0d6f630 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanAsyncLocalEventListener.java
@@ -22,7 +22,6 @@ import org.infinispan.notifications.Listener;
@Listener(clustered = false, sync = false)
public class InfinispanAsyncLocalEventListener extends InfinispanSyncLocalEventListener {
-
public InfinispanAsyncLocalEventListener(InfinispanConsumer consumer, Set<String> eventTypes) {
super(consumer, eventTypes);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
index 113c073..d4a70ca 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanConsumerEmbeddedHandler.java
@@ -20,6 +20,7 @@ import org.apache.camel.component.infinispan.InfinispanConfiguration;
import org.apache.camel.component.infinispan.InfinispanConsumer;
import org.apache.camel.component.infinispan.InfinispanConsumerHandler;
import org.apache.camel.component.infinispan.InfinispanEventListener;
+import org.apache.camel.component.infinispan.InfinispanUtil;
import org.infinispan.Cache;
public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsumerHandler {
@@ -30,7 +31,7 @@ public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsum
@Override
public InfinispanEventListener start(InfinispanConsumer consumer) {
- Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache();
+ Cache<?, ?> embeddedCache = InfinispanUtil.asEmbedded(consumer.getCache());
InfinispanConfiguration configuration = consumer.getConfiguration();
InfinispanEventListener listener;
if (configuration.hasCustomListener()) {
@@ -55,8 +56,7 @@ public final class InfinispanConsumerEmbeddedHandler implements InfinispanConsum
@Override
public void stop(InfinispanConsumer consumer) {
- Cache<?, ?> embeddedCache = (Cache<?, ?>) consumer.getCache();
+ Cache<?, ?> embeddedCache = InfinispanUtil.asEmbedded(consumer.getCache());
embeddedCache.removeListener(consumer.getListener());
}
-
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
index bcf6957..14fac43 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncClusteredEventListener.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.infinispan.embedded;
import java.util.Set;
+
import org.apache.camel.component.infinispan.InfinispanConsumer;
import org.apache.camel.component.infinispan.InfinispanEventListener;
import org.infinispan.notifications.Listener;
@@ -25,16 +26,12 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Listener(clustered = true, sync = true)
public class InfinispanSyncClusteredEventListener extends InfinispanEventListener {
- /*clustered listeners only listen for post events*/
+ // clustered listeners only listen for post events
private static final boolean IS_PRE = false;
- private final transient Logger logger = LoggerFactory.getLogger(this.getClass());
-
public InfinispanSyncClusteredEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) {
super(infinispanConsumer, eventTypes);
}
@@ -44,8 +41,6 @@ public class InfinispanSyncClusteredEventListener extends InfinispanEventListene
@CacheEntryRemoved
@CacheEntryExpired
public void processEvent(CacheEntryEvent<Object, Object> event) {
- logger.trace("Received CacheEntryEvent [{}]", event);
-
if (isAccepted(event.getType().toString())) {
infinispanConsumer.processEvent(event.getType().toString(), IS_PRE, event.getCache().getName(), event.getKey());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
index 1cf9325..e8112bd 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanSyncLocalEventListener.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.infinispan.embedded;
import java.util.Set;
+
import org.apache.camel.component.infinispan.InfinispanConsumer;
import org.apache.camel.component.infinispan.InfinispanEventListener;
import org.infinispan.notifications.Listener;
@@ -30,13 +31,9 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivate
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Listener(clustered = false, sync = true)
public class InfinispanSyncLocalEventListener extends InfinispanEventListener {
- private final transient Logger logger = LoggerFactory.getLogger(this.getClass());
-
public InfinispanSyncLocalEventListener(InfinispanConsumer infinispanConsumer, Set<String> eventTypes) {
super(infinispanConsumer, eventTypes);
}
@@ -51,8 +48,6 @@ public class InfinispanSyncLocalEventListener extends InfinispanEventListener {
@CacheEntryVisited
@CacheEntryExpired
public void processEvent(CacheEntryEvent<Object, Object> event) {
- logger.trace("Received CacheEntryEvent [{}]", event);
-
dispatch(event.getType().toString(), event.isPre(), event.getCache().getName(), event.getKey());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
index e7838b3..b28a460 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/processor/idempotent/InfinispanIdempotentRepository.java
@@ -29,17 +29,19 @@ import org.infinispan.manager.DefaultCacheManager;
public class InfinispanIdempotentRepository extends ServiceSupport implements IdempotentRepository<Object> {
private final String cacheName;
private final BasicCacheContainer cacheContainer;
- private boolean isManagedCacheContainer;
+ private final boolean isManagedCacheContainer;
+ private BasicCache<Object, Boolean> cache;
public InfinispanIdempotentRepository(BasicCacheContainer cacheContainer, String cacheName) {
this.cacheContainer = cacheContainer;
this.cacheName = cacheName;
+ this.isManagedCacheContainer = false;
}
public InfinispanIdempotentRepository(String cacheName) {
- cacheContainer = new DefaultCacheManager();
+ this.cacheContainer = new DefaultCacheManager();
this.cacheName = cacheName;
- isManagedCacheContainer = true;
+ this.isManagedCacheContainer = true;
}
public InfinispanIdempotentRepository() {
@@ -106,16 +108,21 @@ public class InfinispanIdempotentRepository extends ServiceSupport implements Id
@Override
protected void doShutdown() throws Exception {
- super.doShutdown();
if (isManagedCacheContainer) {
cacheContainer.stop();
}
+
+ super.doShutdown();
}
private BasicCache<Object, Boolean> getCache() {
- return cacheName != null
+ if (cache == null) {
+ cache = cacheName != null
? cacheContainer.<Object, Boolean>getCache(cacheName)
: cacheContainer.<Object, Boolean>getCache();
+ }
+
+ return cache;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
index dd0f590..42b3cec 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
@@ -16,7 +16,7 @@
*/
package org.apache.camel.component.infinispan.remote;
-import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.component.infinispan.InfinispanConfiguration;
import org.apache.camel.component.infinispan.InfinispanConstants;
import org.apache.camel.component.infinispan.InfinispanQueryBuilder;
@@ -30,8 +30,8 @@ public final class InfinispanRemoteOperation {
private InfinispanRemoteOperation() {
}
- public static Query buildQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Exchange exchange) {
- InfinispanQueryBuilder queryBuilder = exchange.getIn().getHeader(InfinispanConstants.QUERY_BUILDER, InfinispanQueryBuilder.class);
+ public static Query buildQuery(InfinispanConfiguration configuration, BasicCache<Object, Object> cache, Message message) {
+ InfinispanQueryBuilder queryBuilder = message.getHeader(InfinispanConstants.QUERY_BUILDER, InfinispanQueryBuilder.class);
if (queryBuilder == null) {
queryBuilder = configuration.getQueryBuilder();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
index b47b1db..747a9e4 100644
--- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanClusterTestSupport.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.infinispan;
import java.util.List;
+
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.infinispan.Cache;
@@ -90,15 +91,18 @@ public class InfinispanClusterTestSupport extends CamelTestSupport {
} catch (Throwable ex) {
throw new Exception(ex);
}
+
super.setUp();
}
@Override
public void tearDown() throws Exception {
+ super.tearDown();
+
+ // Has to be done later, maybe CamelTestSupport should
for (BasicCacheContainer container: clusteredCacheContainers) {
container.stop();
}
- super.tearDown();
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
new file mode 100644
index 0000000..48c7772
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanConfigurationTestIT.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import org.infinispan.cache.impl.DecoratedCache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.manager.DefaultCacheManager;
+import org.jgroups.util.UUID;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class InfinispanConfigurationTestIT {
+
+ @Test
+ public void embeddedCacheWithFlagsTest() throws Exception {
+ InfinispanConfiguration configuration = new InfinispanConfiguration();
+ configuration.setHost("localhost");
+ configuration.setCacheName("misc_cache");
+ configuration.setCacheContainer(new DefaultCacheManager(true));
+ configuration.setFlags(
+ org.infinispan.context.Flag.SKIP_CACHE_LOAD,
+ org.infinispan.context.Flag.SKIP_CACHE_STORE
+ );
+
+ InfinispanManager manager = new InfinispanManager(configuration);
+ manager.start();
+
+ BasicCache<Object, Object> cache = manager.getCache();
+ assertNotNull(cache);
+ assertTrue(cache instanceof DecoratedCache);
+
+ DecoratedCache<Object, Object> decoratedCache = (DecoratedCache<Object, Object>)cache;
+ assertTrue(decoratedCache.getFlags().contains(org.infinispan.context.Flag.SKIP_CACHE_LOAD));
+ assertTrue(decoratedCache.getFlags().contains(org.infinispan.context.Flag.SKIP_CACHE_STORE));
+
+ manager.getCacheContainer().stop();
+ manager.stop();
+ }
+
+ @Test
+ public void remoteCacheWithoutProperties() throws Exception {
+ InfinispanConfiguration configuration = new InfinispanConfiguration();
+ configuration.setHost("localhost");
+ configuration.setCacheName("misc_cache");
+
+ InfinispanManager manager = new InfinispanManager(configuration);
+ manager.start();
+
+ BasicCache<Object, Object> cache = manager.getCache();
+ assertNotNull(cache);
+ assertTrue(cache instanceof RemoteCache);
+
+ RemoteCache<Object, Object> remoteCache = InfinispanUtil.asRemote(cache);
+
+ String key = UUID.randomUUID().toString();
+ assertNull(remoteCache.put(key, "val1"));
+ assertNull(remoteCache.put(key, "val2"));
+
+ manager.stop();
+ }
+
+ @Test
+ public void remoteCacheWithPropertiesTest() throws Exception {
+ InfinispanConfiguration configuration = new InfinispanConfiguration();
+ configuration.setHost("localhost");
+ configuration.setCacheName("misc_cache");
+ configuration.setConfigurationUri("infinispan/client.properties");
+
+ InfinispanManager manager = new InfinispanManager(configuration);
+ manager.start();
+
+ BasicCache<Object, Object> cache = manager.getCache();
+ assertNotNull(cache);
+ assertTrue(cache instanceof RemoteCache);
+
+ String key = UUID.randomUUID().toString();
+ assertNull(cache.put(key, "val1"));
+ assertNotNull(cache.put(key, "val2"));
+
+ manager.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/test/resources/infinispan/client.properties
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/resources/infinispan/client.properties b/components/camel-infinispan/src/test/resources/infinispan/client.properties
new file mode 100644
index 0000000..4cacbe2
--- /dev/null
+++ b/components/camel-infinispan/src/test/resources/infinispan/client.properties
@@ -0,0 +1,20 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+# Whether or not to implicitly Flag.FORCE_RETURN_VALUE for all calls.
+# The default value for this property is false.
+infinispan.client.hotrod.force_return_values = true
http://git-wip-us.apache.org/repos/asf/camel/blob/0de49d45/components/camel-infinispan/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/resources/log4j.xml b/components/camel-infinispan/src/test/resources/log4j.xml
new file mode 100644
index 0000000..71a2b38
--- /dev/null
+++ b/components/camel-infinispan/src/test/resources/log4j.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <appender name="FILE" class="org.apache.log4j.FileAppender">
+ <param name="File" value="target/camel-infinispan-test.log" />
+ <param name="Append" value="true" />
+ <param name="Threshold" value="INFO" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d [%-15.15t] %-5p %-30.30c{1} - %m%n" />
+ </layout>
+ </appender>
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Threshold" value="INFO" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%30.30t] %-30.30c{1} %-5p %m%n" />
+ </layout>
+ </appender>
+
+ <!-- ================ -->
+ <!-- Limit categories -->
+ <!-- ================ -->
+ <category name="org.jboss.arquillian">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.infinispan.arquillian.core.WithRunningServerObserver">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.infinispan.server.test.util.TestsuiteListener">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.apache.http">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.infinispan">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.jgroups">
+ <priority value="INFO" />
+ </category>
+
+ <category name="org.apache.commons.httpclient.auth">
+ <priority value="INFO"/>
+ </category>
+
+ <category name="org.apache.auth">
+ <priority value="INFO"/>
+ </category>
+
+ <category name="org.apache.directory">
+ <priority value="WARN"/>
+ </category>
+
+ <!-- ======================= -->
+ <!-- Setup the Root category -->
+ <!-- ======================= -->
+
+ <root>
+ <priority value="INFO" />
+ <appender-ref ref="FILE" />
+ </root>
+
+</log4j:configuration>