You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/08/15 03:49:14 UTC

[27/35] incubator-ignite git commit: # Register client continuous listeners on node join

 # Register client continuous listeners on node join


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/35e3e4e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/35e3e4e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/35e3e4e0

Branch: refs/heads/ignite-264
Commit: 35e3e4e048fa34b5b23ebd0ae235424f3e3492d9
Parents: aed83af
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 13 17:37:00 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 13 17:37:00 2015 +0300

----------------------------------------------------------------------
 .../continuous/GridContinuousProcessor.java     | 44 ++++++++++++++++----
 .../IgniteCacheContinuousQueryClientTest.java   | 33 ++++++++++++---
 .../IgniteCacheQuerySelfTestSuite.java          |  1 +
 scripts/git-format-patch.sh                     |  2 +-
 4 files changed, 66 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index daa9494..5f1c4bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -193,10 +193,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                         unregisterRemote(routineId);
 
                         if (snd.isClient()) {
-                            Map<UUID, LocalRoutineInfo> infoMap = clientInfos.get(snd.id());
+                            Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(snd.id());
 
-                            if (infoMap != null)
-                                infoMap.remove(msg.routineId());
+                            if (clientRoutineMap != null)
+                                clientRoutineMap.remove(msg.routineId());
                         }
                     }
                 }
@@ -370,6 +370,34 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
 
             for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
+                UUID clientNodeId = entry.getKey();
+
+                Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
+
+                for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
+                    UUID routineId = e.getKey();
+                    LocalRoutineInfo info = e.getValue();
+
+                    try {
+                        if (info.prjPred != null)
+                            ctx.resource().injectGeneric(info.prjPred);
+
+                        if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
+                            if (registerHandler(clientNodeId,
+                                routineId,
+                                info.hnd,
+                                info.bufSize,
+                                info.interval,
+                                info.autoUnsubscribe,
+                                false))
+                                info.hnd.onListenerRegistered(routineId, ctx);
+                        }
+                    }
+                    catch (IgniteCheckedException err) {
+                        U.error(log, "Failed to register continuous handler.", err);
+                    }
+                }
+
                 Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey());
 
                 if (map == null) {
@@ -723,17 +751,17 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         if (node.isClient()) {
-            Map<UUID, LocalRoutineInfo> clientRouteMap = clientInfos.get(node.id());
+            Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id());
 
-            if (clientRouteMap == null) {
-                clientRouteMap = new HashMap<>();
+            if (clientRoutineMap == null) {
+                clientRoutineMap = new HashMap<>();
 
-                Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), clientRouteMap);
+                Map<UUID, LocalRoutineInfo> old = clientInfos.put(node.id(), clientRoutineMap);
 
                 assert old == null;
             }
 
-            clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
+            clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
                 hnd,
                 data.bufferSize(),
                 data.interval(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
index bb413a0..d66d1d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.junits.common.*;
@@ -38,7 +40,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  */
 public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest {
     /** */
-    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** */
     private boolean client;
@@ -47,6 +49,8 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
         CacheConfiguration ccfg = new CacheConfiguration();
 
         ccfg.setCacheMode(PARTITIONED);
@@ -60,6 +64,13 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -80,15 +91,27 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
 
         QueryCursor<?> cur = clientNode.cache(null).query(qry);
 
-        Ignite joined = startGrid(4);
+        Ignite joined1 = startGrid(4);
 
-        IgniteCache<Object, Object> joinedCache = joined.cache(null);
+        IgniteCache<Object, Object> joinedCache1 = joined1.cache(null);
 
-        joinedCache.put(primaryKey(joinedCache), 1);
+        joinedCache1.put(primaryKey(joinedCache1), 1);
 
         assertTrue("Failed to wait for event.", lsnr.latch.await(5, SECONDS));
 
         cur.close();
+
+        lsnr.latch = new CountDownLatch(1);
+
+        Ignite joined2 = startGrid(5);
+
+        IgniteCache<Object, Object> joinedCache2 = joined2.cache(null);
+
+        joinedCache2.put(primaryKey(joinedCache2), 2);
+
+        U.sleep(1000);
+
+        assertEquals("Unexpected event received.", 1, lsnr.latch.getCount());
     }
 
     /**
@@ -96,7 +119,7 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
      */
     private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
         /** */
-        private final CountDownLatch latch = new CountDownLatch(1);
+        private volatile CountDownLatch latch = new CountDownLatch(1);
 
         /** */
         @LoggerResource

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 2d7d0ce..a3849d7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -98,6 +98,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
+        suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
 
         // Reduce fields queries.
         suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35e3e4e0/scripts/git-format-patch.sh
----------------------------------------------------------------------
diff --git a/scripts/git-format-patch.sh b/scripts/git-format-patch.sh
index b11c73d..83aee3e 100755
--- a/scripts/git-format-patch.sh
+++ b/scripts/git-format-patch.sh
@@ -20,7 +20,7 @@
 # Git patch-file maker.
 #
 echo 'Usage: scripts/git-format-patch.sh [-ih|--ignitehome <path>] [-idb|--ignitedefbranch <branch-name>] [-ph|--patchhome <path>]'
-echo 'It is a script to create patch between Current branch (branch with changes) and Default branche. The script is safe and do not broke or lose your changes.'
+echo 'It is a script to create patch between Current branch (branch with changes) and Default branch. The script is safe and does not break or lose your changes.'
 echo "It should be called from IGNITE_HOME directory."
 echo "Patch will be created at PATCHES_HOME (= IGNITE_HOME, by default) between Default branch (IGNITE_DEFAULT_BRANCH) and Current branch."
 echo "Note: you can use ${IGNITE_HOME}/scripts/git-patch-prop-local.sh to set your own local properties (to rewrite settings at git-patch-prop-local.sh). "