You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/11 12:07:57 UTC
[2/2] camel git commit: CAMEL-9685 - camel-infinispan : support
continuous query in consumer
CAMEL-9685 - camel-infinispan : support continuous query in consumer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9df244d2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9df244d2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9df244d2
Branch: refs/heads/master
Commit: 9df244d2f7e2e0979f429e1db5448787ce6cca20
Parents: 7c6737b
Author: lburgazzoli <lb...@gmail.com>
Authored: Thu Mar 10 15:39:34 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Mar 11 12:07:46 2016 +0100
----------------------------------------------------------------------
.../infinispan/InfinispanConstants.java | 3 +
.../infinispan/InfinispanConsumer.java | 70 ++++++--
.../component/infinispan/InfinispanUtil.java | 4 +
.../remote/InfinispanConsumerRemoteHandler.java | 3 +-
.../remote/InfinispanRemoteOperation.java | 7 +-
.../infinispan/InfinispanContinuousQueryIT.java | 171 +++++++++++++++++++
.../camel/component/infinispan/UserUtils.java | 7 +
7 files changed, 248 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
index d2a5f95..1a27fd7 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
@@ -50,4 +50,7 @@ public interface InfinispanConstants {
String EVENT_DATA = "CamelInfinispanEventData";
String QUERY = "CamelInfinispanOperationQuery";
String QUERY_BUILDER = "CamelInfinispanQueryBuilder";
+
+ String CACHE_ENTRY_JOINING = "CacheEntryJoining";
+ String CACHE_ENTRY_LEAVING = "CacheEntryLeaving";
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
index 90a5e3e..b4db422 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
@@ -20,11 +20,15 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.infinispan.embedded.InfinispanConsumerEmbeddedHandler;
import org.apache.camel.component.infinispan.remote.InfinispanConsumerRemoteHandler;
+import org.apache.camel.component.infinispan.remote.InfinispanRemoteOperation;
import org.apache.camel.impl.DefaultConsumer;
-import org.infinispan.Cache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.Search;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.api.BasicCacheContainer;
-import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.query.api.continuous.ContinuousQuery;
+import org.infinispan.query.api.continuous.ContinuousQueryListener;
+import org.infinispan.query.dsl.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,8 +36,9 @@ public class InfinispanConsumer extends DefaultConsumer {
private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class);
private final InfinispanConfiguration configuration;
private InfinispanEventListener listener;
- private EmbeddedCacheManager cacheManager;
+ private InfinispanConsumerHandler consumerHandler;
private BasicCache<Object, Object> cache;
+ private ContinuousQuery<Object, Object> continuousQuery;
public InfinispanConsumer(InfinispanEndpoint endpoint, Processor processor, InfinispanConfiguration configuration) {
super(endpoint, processor);
@@ -63,25 +68,44 @@ public class InfinispanConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
+ super.doStart();
+
BasicCacheContainer cacheContainer = configuration.getCacheContainer();
- String cacheName = configuration.getCacheName();
- cache = cacheName == null ? cacheContainer.getCache() : cacheContainer.getCache(cacheName);
- if (InfinispanUtil.isEmbedded(cacheContainer)) {
- listener = InfinispanConsumerEmbeddedHandler.INSTANCE.start(this);
- } else if (InfinispanUtil.isRemote(cacheContainer)) {
- listener = InfinispanConsumerRemoteHandler.INSTANCE.start(this);
+ cache = InfinispanUtil.getCache(configuration.getCacheContainer(), configuration.getCacheName());
+
+ if (configuration.hasQueryBuilder()) {
+ if (InfinispanUtil.isRemote(cache)) {
+ RemoteCache<Object, Object> remoteCache = InfinispanUtil.asRemote(cache);
+ Query query = InfinispanRemoteOperation.buildQuery(configuration.getQueryBuilder(), remoteCache);
+
+ continuousQuery = Search.getContinuousQuery(remoteCache);
+ continuousQuery.addContinuousQueryListener(query, new ContinuousQueryEventListener(cache.getName()));
+ } else {
+ throw new IllegalArgumentException("Can't run continuous queries against embedded cache (" + cache.getName() + ")");
+ }
} else {
- throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + cacheContainer);
- }
- super.doStart();
+ if (InfinispanUtil.isEmbedded(cacheContainer)) {
+ consumerHandler = InfinispanConsumerEmbeddedHandler.INSTANCE;
+ } else if (InfinispanUtil.isRemote(cacheContainer)) {
+ consumerHandler = InfinispanConsumerRemoteHandler.INSTANCE;
+ } else {
+ throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + cacheContainer);
+ }
+ listener = consumerHandler.start(this);
+ }
}
@Override
protected void doStop() throws Exception {
- if (cacheManager != null) {
- cacheManager.removeListener(listener);
+ if (continuousQuery != null) {
+ continuousQuery.removeAllListeners();
+ }
+
+ if (consumerHandler != null) {
+ consumerHandler.stop(this);
}
+
super.doStop();
}
@@ -96,4 +120,22 @@ public class InfinispanConsumer extends DefaultConsumer {
public InfinispanConfiguration getConfiguration() {
return configuration;
}
+
+ private class ContinuousQueryEventListener implements ContinuousQueryListener<Object, Object> {
+ private final String cacheName;
+
+ public ContinuousQueryEventListener(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+ @Override
+ public void resultJoining(Object key, Object value) {
+ processEvent(InfinispanConstants.CACHE_ENTRY_JOINING, false, cacheName, key, value);
+ }
+
+ @Override
+ public void resultLeaving(Object key) {
+ processEvent(InfinispanConstants.CACHE_ENTRY_LEAVING, false, cacheName, key);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
index f12630f..95a5d04 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
@@ -83,4 +83,8 @@ public final class InfinispanUtil {
public static boolean isInHeaderEmpty(Exchange exchange, String header) {
return ObjectHelper.isEmpty(exchange.getIn().getHeader(header));
}
+
+ public static BasicCache<Object, Object> getCache(BasicCacheContainer cacheContainer, String cacheName) {
+ return ObjectHelper.isEmpty(cacheName) ? cacheContainer.getCache() : cacheContainer.getCache(cacheName);
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
index 80f44bc..c3ad05f 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
@@ -50,8 +50,7 @@ public final class InfinispanConsumerRemoteHandler implements InfinispanConsumer
@Override
public void stop(InfinispanConsumer consumer) {
- RemoteCache<?, ?> remoteCache = (RemoteCache<?, ?>) consumer.getCache();
- remoteCache.removeClientListener(consumer.getListener());
+ InfinispanUtil.asRemote(consumer.getCache()).removeClientListener(consumer.getListener());
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
index 530a396..dd0f590 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
@@ -21,6 +21,7 @@ import org.apache.camel.component.infinispan.InfinispanConfiguration;
import org.apache.camel.component.infinispan.InfinispanConstants;
import org.apache.camel.component.infinispan.InfinispanQueryBuilder;
import org.apache.camel.component.infinispan.InfinispanUtil;
+import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.Search;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.query.dsl.Query;
@@ -43,6 +44,10 @@ public final class InfinispanRemoteOperation {
}
public static Query buildQuery(InfinispanQueryBuilder queryBuilder, BasicCache<Object, Object> cache) {
- return queryBuilder != null ? queryBuilder.build(Search.getQueryFactory(InfinispanUtil.asRemote(cache))) : null;
+ return buildQuery(queryBuilder, InfinispanUtil.asRemote(cache));
+ }
+
+ public static Query buildQuery(InfinispanQueryBuilder queryBuilder, RemoteCache<Object, Object> cache) {
+ return queryBuilder != null ? queryBuilder.build(Search.getQueryFactory(cache)) : null;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java
new file mode 100644
index 0000000..b591555
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import java.io.IOException;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
+import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller;
+import org.infinispan.commons.util.Util;
+import org.infinispan.protostream.FileDescriptorSource;
+import org.infinispan.protostream.SerializationContext;
+import org.infinispan.protostream.sampledomain.User;
+import org.infinispan.protostream.sampledomain.marshallers.GenderMarshaller;
+import org.infinispan.protostream.sampledomain.marshallers.UserMarshaller;
+import org.infinispan.query.dsl.Query;
+import org.infinispan.query.dsl.QueryFactory;
+import org.infinispan.query.remote.client.MarshallerRegistration;
+import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants;
+import org.junit.Test;
+
+import static org.apache.camel.component.infinispan.UserUtils.CQ_USERS;
+import static org.apache.camel.component.infinispan.UserUtils.createKey;
+
+public class InfinispanContinuousQueryIT extends CamelTestSupport {
+
+ private static final InfinispanQueryBuilder CONTINUOUS_QUERY_BUILDER = new InfinispanQueryBuilder() {
+ @Override
+ public Query build(QueryFactory<Query> queryFactory) {
+ return queryFactory.from(User.class)
+ .having("name").like("CQ%")
+ .toBuilder().build();
+ }
+ };
+
+ private static final InfinispanQueryBuilder CONTINUOUS_QUERY_BUILDER_NO_MATCH = new InfinispanQueryBuilder() {
+ @Override
+ public Query build(QueryFactory<Query> queryFactory) {
+ return queryFactory.from(User.class)
+ .having("name").like("%TEST%")
+ .toBuilder().build();
+ }
+ };
+
+ private static final InfinispanQueryBuilder CONTINUOUS_QUERY_BUILDER_ALL = new InfinispanQueryBuilder() {
+ @Override
+ public Query build(QueryFactory<Query> queryFactory) {
+ return queryFactory.from(User.class)
+ .having("name").like("%Q0%")
+ .toBuilder().build();
+ }
+ };
+
+ private RemoteCacheManager manager;
+ private RemoteCache<Object, Object> cache;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+ registry.bind("myCustomContainer", manager);
+ registry.bind("continuousQueryBuilder", CONTINUOUS_QUERY_BUILDER);
+ registry.bind("continuousQueryBuilderNoMatch", CONTINUOUS_QUERY_BUILDER_NO_MATCH);
+ registry.bind("continuousQueryBuilderAll", CONTINUOUS_QUERY_BUILDER_ALL);
+
+ return registry;
+ }
+
+ @Override
+ protected void doPreSetup() throws IOException {
+ ConfigurationBuilder builder = new ConfigurationBuilder()
+ .addServer()
+ .host("localhost")
+ .port(11222)
+ .marshaller(new ProtoStreamMarshaller());
+
+ manager = new RemoteCacheManager(builder.build());
+
+ RemoteCache<String, String> metadataCache = manager.getCache(
+ ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME);
+ metadataCache.put(
+ "sample_bank_account/bank.proto",
+ Util.read(InfinispanContinuousQueryIT.class.getResourceAsStream("/sample_bank_account/bank.proto")));
+
+ MarshallerRegistration.registerMarshallers(ProtoStreamMarshaller.getSerializationContext(manager));
+
+ SerializationContext serCtx = ProtoStreamMarshaller.getSerializationContext(manager);
+ serCtx.registerProtoFiles(FileDescriptorSource.fromResources("/sample_bank_account/bank.proto"));
+ serCtx.registerMarshaller(new UserMarshaller());
+ serCtx.registerMarshaller(new GenderMarshaller());
+
+ // pre-load data
+ cache = manager.getCache("remote_query");
+ cache.clear();
+ }
+
+ @Test
+ public void continuousQuery() throws Exception {
+ MockEndpoint continuousQueryBuilderNoMatch = getMockEndpoint("mock:continuousQueryNoMatch");
+ continuousQueryBuilderNoMatch.expectedMessageCount(0);
+
+ MockEndpoint continuousQueryBuilderAll = getMockEndpoint("mock:continuousQueryAll");
+ continuousQueryBuilderAll.expectedMessageCount(CQ_USERS.length * 2);
+
+ MockEndpoint continuousQuery = getMockEndpoint("mock:continuousQuery");
+ continuousQuery.expectedMessageCount(4);
+
+ for (int i = 0; i < 4; i++) {
+ continuousQuery.message(i).outHeader(InfinispanConstants.KEY).isEqualTo(createKey(CQ_USERS[i % 2]));
+ continuousQuery.message(i).outHeader(InfinispanConstants.CACHE_NAME).isEqualTo(cache.getName());
+
+ if (i >= 2) {
+ continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo(InfinispanConstants.CACHE_ENTRY_LEAVING);
+ continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_DATA).isNull();
+ } else {
+ continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo(InfinispanConstants.CACHE_ENTRY_JOINING);
+ continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_DATA).isNotNull();
+ continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_DATA).isInstanceOf(User.class);
+ }
+ }
+
+ for (final User user : CQ_USERS) {
+ cache.put(createKey(user), user);
+ }
+
+ assertEquals(CQ_USERS.length, cache.size());
+
+ for (final User user : CQ_USERS) {
+ cache.remove(createKey(user));
+ }
+
+ assertTrue(cache.isEmpty());
+
+ continuousQuery.assertIsSatisfied();
+ continuousQueryBuilderNoMatch.assertIsSatisfied();
+ continuousQueryBuilderAll.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query&queryBuilder=#continuousQueryBuilder")
+ .to("mock:continuousQuery");
+ from("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query&queryBuilder=#continuousQueryBuilderNoMatch")
+ .to("mock:continuousQueryNoMatch");
+ from("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query&queryBuilder=#continuousQueryBuilderAll")
+ .to("mock:continuousQueryAll");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java
index 4126b83..df2ba65 100644
--- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java
@@ -25,6 +25,13 @@ public final class UserUtils {
createUser("nameA", "surnameB"),
createUser("nameB", "surnameB")};
+ public static final User[] CQ_USERS = new User[]{
+ createUser("CQ01", "surname01"),
+ createUser("CQ02", "surname01"),
+ createUser("NQ03", "surname03"),
+ createUser("NQ04", "surname04")
+ };
+
private UserUtils() {
}