You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/04/03 08:31:44 UTC
[44/50] [abbrv] ignite git commit: IGNITE-4284 - Fix. Failed second
client node join with continuous query and peer class loading enabled.
IGNITE-4284 - Fix. Failed second client node join with continuous query and peer class loading enabled.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/357c20ab
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/357c20ab
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/357c20ab
Branch: refs/heads/master
Commit: 357c20ab9593390fb7af25f8638188595c5f6cd4
Parents: b689624
Author: dkarachentsev <dk...@gridgain.com>
Authored: Thu Mar 30 12:50:42 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Thu Mar 30 12:50:42 2017 +0300
----------------------------------------------------------------------
.../continuous/GridContinuousProcessor.java | 39 ++---
.../ignite/custom/DummyEventFilterFactory.java | 47 ++++++
.../ContinuousQueryPeerClassLoadingTest.java | 142 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite3.java | 2 +
4 files changed, 214 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 9fd9b6d..6a4f57d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
@@ -495,29 +496,34 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
UUID clientNodeId = entry.getKey();
- if (!ctx.localNodeId().equals(clientNodeId)) {
+ if (!ctx.clientNode()) {
Map<UUID, LocalRoutineInfo> clientRoutineMap = entry.getValue();
for (Map.Entry<UUID, LocalRoutineInfo> e : clientRoutineMap.entrySet()) {
UUID routineId = e.getKey();
LocalRoutineInfo info = e.getValue();
- try {
- if (info.prjPred != null)
- ctx.resource().injectGeneric(info.prjPred);
-
- if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
- registerHandler(clientNodeId,
- routineId,
- info.hnd,
- info.bufSize,
- info.interval,
- info.autoUnsubscribe,
- false);
+ GridCacheContext cctx = ctx.cache().context().cacheContext(CU.cacheId(info.hnd.cacheName()));
+
+ // Do not register handler if it's not affinity node.
+ if (cctx == null || cctx.affinityNode()) {
+ try {
+ if (info.prjPred != null)
+ ctx.resource().injectGeneric(info.prjPred);
+
+ if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) {
+ registerHandler(clientNodeId,
+ routineId,
+ info.hnd,
+ info.bufSize,
+ info.interval,
+ info.autoUnsubscribe,
+ false);
+ }
+ }
+ catch (IgniteCheckedException err) {
+ U.error(log, "Failed to register continuous handler.", err);
}
- }
- catch (IgniteCheckedException err) {
- U.error(log, "Failed to register continuous handler.", err);
}
}
}
@@ -583,6 +589,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @return Routine ID.
* @throws IgniteCheckedException If failed.
*/
+ @SuppressWarnings("unchecked")
public UUID registerStaticRoutine(
String cacheName,
CacheEntryUpdatedListener<?, ?> locLsnr,
http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java b/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java
new file mode 100644
index 0000000..e0688bc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/custom/DummyEventFilterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.custom;
+
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+
+/**
+ * Must be not in org.apache.ignite.internal
+ */
+public class DummyEventFilterFactory implements Factory<CacheEntryEventFilter<Integer, String>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventFilter<Integer, String> create() {
+ return new DummyEventFilter();
+ }
+
+ /**
+ *
+ */
+ private static class DummyEventFilter implements CacheEntryEventFilter<Integer, String> {
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(
+ final CacheEntryEvent<? extends Integer, ? extends String> evt) throws CacheEntryListenerException {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
new file mode 100644
index 0000000..e5d1d60
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryPeerClassLoadingTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.custom.DummyEventFilterFactory;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Checks if filter factory correctly deployed on all nodes.
+ */
+public class ContinuousQueryPeerClassLoadingTest extends GridCommonAbstractTest {
+ /** */
+ public static final String CACHE_NAME = "test-cache";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(true);
+ cfg.setClientMode(gridName.contains("client"));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoteFilterFactoryClient() throws Exception {
+ check("server", "client1", "client2");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoteFilterFactoryServer1() throws Exception {
+ check("server1", "server2", "client");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoteFilterFactoryServer2() throws Exception {
+ check("server1", "server2", "server3");
+ }
+
+ /**
+ * @param node1Name Node 1 name.
+ * @param node2Name Node 2 name.
+ * @param node3Name Node 3 name.
+ */
+ private void check(String node1Name, String node2Name, String node3Name) throws Exception {
+ final Ignite node1 = startGrid(node1Name);
+
+ final IgniteCache<Integer, String> cache = node1.getOrCreateCache(CACHE_NAME);
+
+ for (int i = 0; i < 10; i++)
+ cache.put(i, String.valueOf(i));
+
+ final Ignite node2 = startGrid(node2Name);
+
+ final ContinuousQuery<Integer, String> qry1 = new ContinuousQuery<>();
+ final ContinuousQuery<Integer, String> qry2 = new ContinuousQuery<>();
+
+ qry1.setRemoteFilterFactory(new DummyEventFilterFactory());
+ qry2.setRemoteFilterFactory(new DummyEventFilterFactory());
+
+ final AtomicInteger client1Evts = new AtomicInteger(0);
+ final AtomicInteger client2Evts = new AtomicInteger(0);
+
+ final CountDownLatch latch1 = new CountDownLatch(20);
+ final CountDownLatch latch2 = new CountDownLatch(10);
+
+ qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
+ @Override public void onUpdated(
+ final Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) throws CacheEntryListenerException {
+ System.out.println(">> Client 1 events " + evts);
+ for (CacheEntryEvent<? extends Integer, ? extends String> evt : evts)
+ latch1.countDown();
+ }
+ });
+
+ qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
+ @Override public void onUpdated(
+ final Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) throws CacheEntryListenerException {
+ System.out.println(">> Client 2 events " + evts);
+ for (CacheEntryEvent<? extends Integer, ? extends String> evt : evts)
+ latch2.countDown();
+ }
+ });
+
+ final IgniteCache<Integer, String> cache1 = node2.cache(CACHE_NAME);
+
+ cache1.query(qry1);
+
+ for (int i = 10; i < 20; i++)
+ cache.put(i, String.valueOf(i));
+
+ // Fail on start second client.
+ final Ignite node3 = startGrid(node3Name);
+
+ final IgniteCache<Integer, String> cache2 = node3.cache(CACHE_NAME);
+
+ cache2.query(qry2);
+
+ for (int i = 20; i < 30; i++)
+ cache.put(i, String.valueOf(i));
+
+ assert latch1.await(5, TimeUnit.SECONDS) : latch1.getCount();
+ assert latch2.await(5, TimeUnit.SECONDS) : latch2.getCount();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/357c20ab/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index a865788..6b2fea0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBin
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationSwapEnabledTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryPeerClassLoadingTest;
import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryRemoteFilterMissingInClassPathSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
@@ -123,6 +124,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryBackupQueueTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryNoUnsubscribeTest.class);
+ suite.addTestSuite(ContinuousQueryPeerClassLoadingTest.class);
return suite;
}