You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/11 12:07:57 UTC

[2/2] camel git commit: CAMEL-9685 - camel-infinispan : support continuous query in consumer

CAMEL-9685 - camel-infinispan : support continuous query in consumer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9df244d2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9df244d2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9df244d2

Branch: refs/heads/master
Commit: 9df244d2f7e2e0979f429e1db5448787ce6cca20
Parents: 7c6737b
Author: lburgazzoli <lb...@gmail.com>
Authored: Thu Mar 10 15:39:34 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Mar 11 12:07:46 2016 +0100

----------------------------------------------------------------------
 .../infinispan/InfinispanConstants.java         |   3 +
 .../infinispan/InfinispanConsumer.java          |  70 ++++++--
 .../component/infinispan/InfinispanUtil.java    |   4 +
 .../remote/InfinispanConsumerRemoteHandler.java |   3 +-
 .../remote/InfinispanRemoteOperation.java       |   7 +-
 .../infinispan/InfinispanContinuousQueryIT.java | 171 +++++++++++++++++++
 .../camel/component/infinispan/UserUtils.java   |   7 +
 7 files changed, 248 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
index d2a5f95..1a27fd7 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConstants.java
@@ -50,4 +50,7 @@ public interface InfinispanConstants {
     String EVENT_DATA = "CamelInfinispanEventData";
     String QUERY = "CamelInfinispanOperationQuery";
     String QUERY_BUILDER = "CamelInfinispanQueryBuilder";
+
+    String CACHE_ENTRY_JOINING = "CacheEntryJoining";
+    String CACHE_ENTRY_LEAVING = "CacheEntryLeaving";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
index 90a5e3e..b4db422 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
@@ -20,11 +20,15 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.infinispan.embedded.InfinispanConsumerEmbeddedHandler;
 import org.apache.camel.component.infinispan.remote.InfinispanConsumerRemoteHandler;
+import org.apache.camel.component.infinispan.remote.InfinispanRemoteOperation;
 import org.apache.camel.impl.DefaultConsumer;
-import org.infinispan.Cache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.Search;
 import org.infinispan.commons.api.BasicCache;
 import org.infinispan.commons.api.BasicCacheContainer;
-import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.query.api.continuous.ContinuousQuery;
+import org.infinispan.query.api.continuous.ContinuousQueryListener;
+import org.infinispan.query.dsl.Query;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,8 +36,9 @@ public class InfinispanConsumer extends DefaultConsumer {
     private static final transient Logger LOGGER = LoggerFactory.getLogger(InfinispanProducer.class);
     private final InfinispanConfiguration configuration;
     private InfinispanEventListener listener;
-    private EmbeddedCacheManager cacheManager;
+    private InfinispanConsumerHandler consumerHandler;
     private BasicCache<Object, Object> cache;
+    private ContinuousQuery<Object, Object> continuousQuery;
 
     public InfinispanConsumer(InfinispanEndpoint endpoint, Processor processor, InfinispanConfiguration configuration) {
         super(endpoint, processor);
@@ -63,25 +68,44 @@ public class InfinispanConsumer extends DefaultConsumer {
 
     @Override
     protected void doStart() throws Exception {
+        super.doStart();
+
         BasicCacheContainer cacheContainer = configuration.getCacheContainer();
-        String cacheName = configuration.getCacheName();
-        cache = cacheName == null ? cacheContainer.getCache() : cacheContainer.getCache(cacheName);
-        if (InfinispanUtil.isEmbedded(cacheContainer)) {
-            listener = InfinispanConsumerEmbeddedHandler.INSTANCE.start(this);
-        } else if (InfinispanUtil.isRemote(cacheContainer)) {
-            listener = InfinispanConsumerRemoteHandler.INSTANCE.start(this);
+        cache = InfinispanUtil.getCache(configuration.getCacheContainer(), configuration.getCacheName());
+
+        if (configuration.hasQueryBuilder()) {
+            if (InfinispanUtil.isRemote(cache)) {
+                RemoteCache<Object, Object> remoteCache = InfinispanUtil.asRemote(cache);
+                Query query = InfinispanRemoteOperation.buildQuery(configuration.getQueryBuilder(), remoteCache);
+
+                continuousQuery = Search.getContinuousQuery(remoteCache);
+                continuousQuery.addContinuousQueryListener(query, new ContinuousQueryEventListener(cache.getName()));
+            } else {
+                throw new IllegalArgumentException("Can't run continuous queries against embedded cache (" + cache.getName() + ")");
+            }
         } else {
-            throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + cacheContainer);
-        }
-        super.doStart();
+            if (InfinispanUtil.isEmbedded(cacheContainer)) {
+                consumerHandler = InfinispanConsumerEmbeddedHandler.INSTANCE;
+            } else if (InfinispanUtil.isRemote(cacheContainer)) {
+                consumerHandler = InfinispanConsumerRemoteHandler.INSTANCE;
+            } else {
+                throw new UnsupportedOperationException("Consumer not support for CacheContainer: " + cacheContainer);
+            }
 
+            listener = consumerHandler.start(this);
+        }
     }
 
     @Override
     protected void doStop() throws Exception {
-        if (cacheManager != null) {
-            cacheManager.removeListener(listener);
+        if (continuousQuery != null) {
+            continuousQuery.removeAllListeners();
+        }
+
+        if (consumerHandler != null) {
+            consumerHandler.stop(this);
         }
+
         super.doStop();
     }
 
@@ -96,4 +120,22 @@ public class InfinispanConsumer extends DefaultConsumer {
     public InfinispanConfiguration getConfiguration() {
         return configuration;
     }
+
+    private class ContinuousQueryEventListener implements ContinuousQueryListener<Object, Object> {
+        private final String cacheName;
+
+        public ContinuousQueryEventListener(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        @Override
+        public void resultJoining(Object key, Object value) {
+            processEvent(InfinispanConstants.CACHE_ENTRY_JOINING, false, cacheName, key, value);
+        }
+
+        @Override
+        public void resultLeaving(Object key) {
+            processEvent(InfinispanConstants.CACHE_ENTRY_LEAVING, false, cacheName, key);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
index f12630f..95a5d04 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
@@ -83,4 +83,8 @@ public final class InfinispanUtil {
     public static boolean isInHeaderEmpty(Exchange exchange, String header) {
         return ObjectHelper.isEmpty(exchange.getIn().getHeader(header));
     }
+
+    public static BasicCache<Object, Object> getCache(BasicCacheContainer cacheContainer, String cacheName) {
+        return ObjectHelper.isEmpty(cacheName) ? cacheContainer.getCache() : cacheContainer.getCache(cacheName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
index 80f44bc..c3ad05f 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanConsumerRemoteHandler.java
@@ -50,8 +50,7 @@ public final class InfinispanConsumerRemoteHandler implements InfinispanConsumer
 
     @Override
     public void stop(InfinispanConsumer consumer) {
-        RemoteCache<?, ?> remoteCache = (RemoteCache<?, ?>) consumer.getCache();
-        remoteCache.removeClientListener(consumer.getListener());
+        InfinispanUtil.asRemote(consumer.getCache()).removeClientListener(consumer.getListener());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
index 530a396..dd0f590 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteOperation.java
@@ -21,6 +21,7 @@ import org.apache.camel.component.infinispan.InfinispanConfiguration;
 import org.apache.camel.component.infinispan.InfinispanConstants;
 import org.apache.camel.component.infinispan.InfinispanQueryBuilder;
 import org.apache.camel.component.infinispan.InfinispanUtil;
+import org.infinispan.client.hotrod.RemoteCache;
 import org.infinispan.client.hotrod.Search;
 import org.infinispan.commons.api.BasicCache;
 import org.infinispan.query.dsl.Query;
@@ -43,6 +44,10 @@ public final class InfinispanRemoteOperation {
     }
 
     public static Query buildQuery(InfinispanQueryBuilder queryBuilder, BasicCache<Object, Object> cache) {
-        return queryBuilder != null ? queryBuilder.build(Search.getQueryFactory(InfinispanUtil.asRemote(cache))) : null;
+        return buildQuery(queryBuilder, InfinispanUtil.asRemote(cache));
+    }
+
+    public static Query buildQuery(InfinispanQueryBuilder queryBuilder, RemoteCache<Object, Object> cache) {
+        return queryBuilder != null ? queryBuilder.build(Search.getQueryFactory(cache)) : null;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java
new file mode 100644
index 0000000..b591555
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/InfinispanContinuousQueryIT.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan;
+
+import java.io.IOException;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
+import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller;
+import org.infinispan.commons.util.Util;
+import org.infinispan.protostream.FileDescriptorSource;
+import org.infinispan.protostream.SerializationContext;
+import org.infinispan.protostream.sampledomain.User;
+import org.infinispan.protostream.sampledomain.marshallers.GenderMarshaller;
+import org.infinispan.protostream.sampledomain.marshallers.UserMarshaller;
+import org.infinispan.query.dsl.Query;
+import org.infinispan.query.dsl.QueryFactory;
+import org.infinispan.query.remote.client.MarshallerRegistration;
+import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants;
+import org.junit.Test;
+
+import static org.apache.camel.component.infinispan.UserUtils.CQ_USERS;
+import static org.apache.camel.component.infinispan.UserUtils.createKey;
+
+public class InfinispanContinuousQueryIT extends CamelTestSupport {
+
+    private static final InfinispanQueryBuilder CONTINUOUS_QUERY_BUILDER = new InfinispanQueryBuilder() {
+        @Override
+        public Query build(QueryFactory<Query> queryFactory) {
+            return queryFactory.from(User.class)
+                .having("name").like("CQ%")
+                .toBuilder().build();
+        }
+    };
+
+    private static final InfinispanQueryBuilder CONTINUOUS_QUERY_BUILDER_NO_MATCH = new InfinispanQueryBuilder() {
+        @Override
+        public Query build(QueryFactory<Query> queryFactory) {
+            return queryFactory.from(User.class)
+                .having("name").like("%TEST%")
+                .toBuilder().build();
+        }
+    };
+
+    private static final InfinispanQueryBuilder CONTINUOUS_QUERY_BUILDER_ALL = new InfinispanQueryBuilder() {
+        @Override
+        public Query build(QueryFactory<Query> queryFactory) {
+            return queryFactory.from(User.class)
+                .having("name").like("%Q0%")
+                .toBuilder().build();
+        }
+    };
+
+    private RemoteCacheManager manager;
+    private RemoteCache<Object, Object> cache;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        registry.bind("myCustomContainer", manager);
+        registry.bind("continuousQueryBuilder", CONTINUOUS_QUERY_BUILDER);
+        registry.bind("continuousQueryBuilderNoMatch", CONTINUOUS_QUERY_BUILDER_NO_MATCH);
+        registry.bind("continuousQueryBuilderAll", CONTINUOUS_QUERY_BUILDER_ALL);
+
+        return registry;
+    }
+
+    @Override
+    protected void doPreSetup() throws IOException {
+        ConfigurationBuilder builder = new ConfigurationBuilder()
+            .addServer()
+            .host("localhost")
+            .port(11222)
+            .marshaller(new ProtoStreamMarshaller());
+
+        manager = new RemoteCacheManager(builder.build());
+
+        RemoteCache<String, String> metadataCache = manager.getCache(
+            ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME);
+        metadataCache.put(
+            "sample_bank_account/bank.proto",
+            Util.read(InfinispanContinuousQueryIT.class.getResourceAsStream("/sample_bank_account/bank.proto")));
+
+        MarshallerRegistration.registerMarshallers(ProtoStreamMarshaller.getSerializationContext(manager));
+
+        SerializationContext serCtx = ProtoStreamMarshaller.getSerializationContext(manager);
+        serCtx.registerProtoFiles(FileDescriptorSource.fromResources("/sample_bank_account/bank.proto"));
+        serCtx.registerMarshaller(new UserMarshaller());
+        serCtx.registerMarshaller(new GenderMarshaller());
+
+        // pre-load data
+        cache = manager.getCache("remote_query");
+        cache.clear();
+    }
+
+    @Test
+    public void continuousQuery() throws Exception {
+        MockEndpoint continuousQueryBuilderNoMatch = getMockEndpoint("mock:continuousQueryNoMatch");
+        continuousQueryBuilderNoMatch.expectedMessageCount(0);
+
+        MockEndpoint continuousQueryBuilderAll = getMockEndpoint("mock:continuousQueryAll");
+        continuousQueryBuilderAll.expectedMessageCount(CQ_USERS.length * 2);
+
+        MockEndpoint continuousQuery = getMockEndpoint("mock:continuousQuery");
+        continuousQuery.expectedMessageCount(4);
+
+        for (int i = 0; i < 4; i++) {
+            continuousQuery.message(i).outHeader(InfinispanConstants.KEY).isEqualTo(createKey(CQ_USERS[i % 2]));
+            continuousQuery.message(i).outHeader(InfinispanConstants.CACHE_NAME).isEqualTo(cache.getName());
+
+            if (i >= 2) {
+                continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo(InfinispanConstants.CACHE_ENTRY_LEAVING);
+                continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_DATA).isNull();
+            } else {
+                continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_TYPE).isEqualTo(InfinispanConstants.CACHE_ENTRY_JOINING);
+                continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_DATA).isNotNull();
+                continuousQuery.message(i).outHeader(InfinispanConstants.EVENT_DATA).isInstanceOf(User.class);
+            }
+        }
+
+        for (final User user : CQ_USERS) {
+            cache.put(createKey(user), user);
+        }
+
+        assertEquals(CQ_USERS.length, cache.size());
+
+        for (final User user : CQ_USERS) {
+            cache.remove(createKey(user));
+        }
+
+        assertTrue(cache.isEmpty());
+
+        continuousQuery.assertIsSatisfied();
+        continuousQueryBuilderNoMatch.assertIsSatisfied();
+        continuousQueryBuilderAll.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query&queryBuilder=#continuousQueryBuilder")
+                    .to("mock:continuousQuery");
+                from("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query&queryBuilder=#continuousQueryBuilderNoMatch")
+                    .to("mock:continuousQueryNoMatch");
+                from("infinispan://?cacheContainer=#myCustomContainer&cacheName=remote_query&queryBuilder=#continuousQueryBuilderAll")
+                    .to("mock:continuousQueryAll");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9df244d2/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java
index 4126b83..df2ba65 100644
--- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/UserUtils.java
@@ -25,6 +25,13 @@ public final class UserUtils {
             createUser("nameA", "surnameB"),
             createUser("nameB", "surnameB")};
 
+    public static final User[] CQ_USERS = new User[]{
+        createUser("CQ01", "surname01"),
+        createUser("CQ02", "surname01"),
+        createUser("NQ03", "surname03"),
+        createUser("NQ04", "surname04")
+    };
+
     private UserUtils() {
     }