You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/13 12:45:31 UTC

incubator-ignite git commit: # Register client continuous listeners on node join

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1245 [created] a246057c3


# Register client continuous listeners on node join


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a246057c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a246057c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a246057c

Branch: refs/heads/ignite-1245
Commit: a246057c3240acaa37ae28873a217c771ef93b6a
Parents: 36f7ba6
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 13 13:05:43 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 13 13:45:21 2015 +0300

----------------------------------------------------------------------
 .../continuous/GridContinuousProcessor.java     |  34 ++++-
 .../IgniteCacheContinuousQueryClientTest.java   | 134 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   1 +
 3 files changed, 166 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a246057c/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index daa9494..8c8e259 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -193,10 +193,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                         unregisterRemote(routineId);
 
                         if (snd.isClient()) {
-                            Map<UUID, LocalRoutineInfo> infoMap = clientInfos.get(snd.id());
+                            Map<UUID, LocalRoutineInfo> clientRouteMap = clientInfos.get(snd.id());
 
-                            if (infoMap != null)
-                                infoMap.remove(msg.routineId());
+                            if (clientRouteMap != null)
+                                clientRouteMap.remove(msg.routineId());
                         }
                     }
                 }
@@ -370,6 +370,34 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
 
             for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
+                UUID clientNodeId = entry.getKey();
+
+                Map<UUID, LocalRoutineInfo> clientRouteMap = entry.getValue();
+
+                for (Map.Entry<UUID, LocalRoutineInfo> e : clientRouteMap.entrySet()) {
+                    UUID routineId = e.getKey();
+                    LocalRoutineInfo info = e.getValue();
+
+                    try {
+                        if (info.prjPred != null)
+                            ctx.resource().injectGeneric(info.prjPred);
+
+                        if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
+                            if (registerHandler(clientNodeId,
+                                routineId,
+                                info.hnd,
+                                info.bufSize,
+                                info.interval,
+                                info.autoUnsubscribe,
+                                false))
+                                info.hnd.onListenerRegistered(routineId, ctx);
+                        }
+                    }
+                    catch (IgniteCheckedException err) {
+                        U.error(log, "Failed to register continuous handler.", err);
+                    }
+                }
+
                 Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey());
 
                 if (map == null) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a246057c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
new file mode 100644
index 0000000..7341dea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.event.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeJoins() throws Exception {
+        startGrids(2);
+
+        client = true;
+
+        Ignite clientNode = startGrid(3);
+
+        client = false;
+
+        CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = clientNode.cache(null).query(qry);
+
+        Ignite joined1 = startGrid(4);
+
+        IgniteCache<Object, Object> joinedCache1 = joined1.cache(null);
+
+        joinedCache1.put(primaryKey(joinedCache1), 1);
+
+        assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
+
+        cur.close();
+
+        lsnr.latch = new CountDownLatch(1);
+
+        Ignite joined2 = startGrid(5);
+
+        IgniteCache<Object, Object> joinedCache2 = joined2.cache(null);
+
+        joinedCache2.put(primaryKey(joinedCache2), 2);
+
+        U.sleep(1000);
+
+        assertEquals("Unexpected event received.", 1, lsnr.latch.getCount());
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+        /** */
+        private volatile CountDownLatch latch = new CountDownLatch(1);
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            for (CacheEntryEvent<?, ?> evt : evts) {
+                log.info("Received cache event: " + evt);
+
+                latch.countDown();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a246057c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 2d7d0ce..a3849d7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -98,6 +98,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
+        suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
 
         // Reduce fields queries.
         suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);