You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/06 10:45:57 UTC

[20/50] [abbrv] ignite git commit: IGNITE-4284 - Fix. Failed second client node join with continuous query and peer class loading enabled.

IGNITE-4284 - Fix. Failed second client node join with continuous query and peer class loading enabled.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/357c20ab
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/357c20ab
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/357c20ab

Branch: refs/heads/ignite-3477-master
Commit: 357c20ab9593390fb7af25f8638188595c5f6cd4
Parents: b689624
Author: dkarachentsev <dk...@gridgain.com>
Authored: Thu Mar 30 12:50:42 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Thu Mar 30 12:50:42 2017 +0300

----------------------------------------------------------------------
 .../continuous/GridContinuousProcessor.java     |  39 ++---
 .../ignite/custom/DummyEventFilterFactory.java  |  47 ++++++
 .../ContinuousQueryPeerClassLoadingTest.java    | 142 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 4 files changed, 214 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 9fd9b6d..6a4f57d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -495,29 +496,34 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
                 UUID clientNodeId = entry.getKey();
 
-                if (!ctx.localNodeId().equals(clientNodeId)) {
+                if (!ctx.clientNode()) {
                     Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
 
                     for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
                         UUID routineId = e.getKey();
                         LocalRoutineInfo info = e.getValue();
 
-                        try {
-                            if (info.prjPred != null)
-                                ctx.resource().injectGeneric(info.prjPred);
-
-                            if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
-                                registerHandler(clientNodeId,
-                                    routineId,
-                                    info.hnd,
-                                    info.bufSize,
-                                    info.interval,
-                                    info.autoUnsubscribe,
-                                    false);
+                        GridCacheContext cctx = ctx.cache().context().cacheContext(CU.cacheId(info.hnd.cacheName()));
+
+                        // Do not register handler if it's not affinity node.
+                        if (cctx == null || cctx.affinityNode()) {
+                            try {
+                                if (info.prjPred != null)
+                                    ctx.resource().injectGeneric(info.prjPred);
+
+                                if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
+                                    registerHandler(clientNodeId,
+                                        routineId,
+                                        info.hnd,
+                                        info.bufSize,
+                                        info.interval,
+                                        info.autoUnsubscribe,
+                                        false);
+                                }
+                            }
+                            catch (IgniteCheckedException err) {
+                                U.error(log, "Failed to register continuous handler.", err);
                             }
-                        }
-                        catch (IgniteCheckedException err) {
-                            U.error(log, "Failed to register continuous handler.", err);
                         }
                     }
                 }
@@ -583,6 +589,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @return Routine ID.
      * @throws IgniteCheckedException If failed.
      */
+    @SuppressWarnings("unchecked")
     public UUID registerStaticRoutine(
         String cacheName,
         CacheEntryUpdatedListener<?, ?> locLsnr,

http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java b/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java
new file mode 100644
index 0000000..e0688bc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.custom;
+
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+
+/**
+ * Must be not in org.apache.ignite.internal
+ */
+public class DummyEventFilterFactory implements Factory<CacheEntryEventFilter<Integer, String>> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public CacheEntryEventFilter<Integer, String> create() {
+        return new DummyEventFilter();
+    }
+
+    /**
+     *
+     */
+    private static class DummyEventFilter implements CacheEntryEventFilter<Integer, String> {
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(
+            final CacheEntryEvent<? extends Integer, ? extends String> evt) throws CacheEntryListenerException {
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
new file mode 100644
index 0000000..e5d1d60
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.custom.DummyEventFilterFactory;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Checks if filter factory correctly deployed on all nodes.
+ */
+public class ContinuousQueryPeerClassLoadingTest extends GridCommonAbstractTest {
+    /** */
+    public static final String CACHE_NAME = "test-cache";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(true);
+        cfg.setClientMode(gridName.contains("client"));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoteFilterFactoryClient() throws Exception {
+        check("server", "client1", "client2");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoteFilterFactoryServer1() throws Exception {
+        check("server1", "server2", "client");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoteFilterFactoryServer2() throws Exception {
+        check("server1", "server2", "server3");
+    }
+
+    /**
+     * @param node1Name Node 1 name.
+     * @param node2Name Node 2 name.
+     * @param node3Name Node 3 name.
+     */
+    private void check(String node1Name, String node2Name, String node3Name) throws Exception {
+        final Ignite node1 = startGrid(node1Name);
+
+        final IgniteCache<Integer, String> cache = node1.getOrCreateCache(CACHE_NAME);
+
+        for (int i = 0; i < 10; i++)
+            cache.put(i, String.valueOf(i));
+
+        final Ignite node2 = startGrid(node2Name);
+
+        final ContinuousQuery<Integer, String> qry1 = new ContinuousQuery<>();
+        final ContinuousQuery<Integer, String> qry2 = new ContinuousQuery<>();
+
+        qry1.setRemoteFilterFactory(new DummyEventFilterFactory());
+        qry2.setRemoteFilterFactory(new DummyEventFilterFactory());
+
+        final AtomicInteger client1Evts = new AtomicInteger(0);
+        final AtomicInteger client2Evts = new AtomicInteger(0);
+
+        final CountDownLatch latch1 = new CountDownLatch(20);
+        final CountDownLatch latch2 = new CountDownLatch(10);
+
+        qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
+            @Override public void onUpdated(
+                final Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) throws CacheEntryListenerException {
+                System.out.println(">> Client 1 events " + evts);
+                for (CacheEntryEvent<? extends Integer, ? extends String> evt : evts)
+                    latch1.countDown();
+            }
+        });
+
+        qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
+            @Override public void onUpdated(
+                final Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) throws CacheEntryListenerException {
+                System.out.println(">> Client 2 events " + evts);
+                for (CacheEntryEvent<? extends Integer, ? extends String> evt : evts)
+                    latch2.countDown();
+            }
+        });
+
+        final IgniteCache<Integer, String> cache1 = node2.cache(CACHE_NAME);
+
+        cache1.query(qry1);
+
+        for (int i = 10; i < 20; i++)
+            cache.put(i, String.valueOf(i));
+
+        // Fail on start second client.
+        final Ignite node3 = startGrid(node3Name);
+
+        final IgniteCache<Integer, String> cache2 = node3.cache(CACHE_NAME);
+
+        cache2.query(qry2);
+
+        for (int i = 20; i < 30; i++)
+            cache.put(i, String.valueOf(i));
+
+        assert latch1.await(5, TimeUnit.SECONDS) : latch1.getCount();
+        assert latch2.await(5, TimeUnit.SECONDS) : latch2.getCount();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index a865788..6b2fea0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBin
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationSwapEnabledTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryPeerClassLoadingTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryRemoteFilterMissingInClassPathSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
@@ -123,6 +124,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
         suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryBackupQueueTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryNoUnsubscribeTest.class);
+        suite.addTestSuite(ContinuousQueryPeerClassLoadingTest.class);
 
         return suite;
     }