You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 14:35:26 UTC
[02/18] ignite git commit: IGNITE-2468
IGNITE-2468
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/725d6cb5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/725d6cb5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/725d6cb5
Branch: refs/heads/ignite-2604
Commit: 725d6cb557684ac8f31dfde8f5fcb4ddb95a18dd
Parents: 763bf57
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Feb 12 14:08:25 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Feb 12 14:08:25 2016 +0300
----------------------------------------------------------------------
.../internal/GridMessageListenHandler.java | 16 ++
.../continuous/GridContinuousProcessor.java | 50 +++--
...eClientReconnectContinuousProcessorTest.java | 32 +++-
...IgniteCacheContinuousQueryReconnectTest.java | 192 +++++++++++++++++++
.../tcp/TcpClientDiscoverySpiSelfTest.java | 2 +-
.../IgniteCacheQuerySelfTestSuite.java | 2 +
6 files changed, 279 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 13aeb54..bf81944 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -83,6 +83,22 @@ public class GridMessageListenHandler implements GridContinuousHandler {
this.pred = pred;
}
+ /**
+ *
+ * @param orig Handler to be copied.
+ */
+ public GridMessageListenHandler(GridMessageListenHandler orig) {
+ assert orig != null;
+
+ this.clsName = orig.clsName;
+ this.depInfo = orig.depInfo;
+ this.pred = orig.pred;
+ this.predBytes = orig.predBytes;
+ this.topic = orig.topic;
+ this.topicBytes = orig.topicBytes;
+ this.depEnabled = false;
+ }
+
/** {@inheritDoc} */
@Override public boolean isEvents() {
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 0218897..496f820 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridMessageListenHandler;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -428,11 +429,17 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.resource().injectGeneric(item.prjPred);
// Register handler only if local node passes projection predicate.
- if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) {
+ if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) &&
+ !locInfos.containsKey(item.routineId)) {
if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
item.autoUnsubscribe, false))
item.hnd.onListenerRegistered(item.routineId, ctx);
}
+
+ if (!item.autoUnsubscribe)
+ // Register routine locally.
+ locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(
+ item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to register continuous handler.", e);
@@ -854,6 +861,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
}
+ GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ?
+ new GridMessageListenHandler((GridMessageListenHandler)hnd) :
+ hnd;
+
if (node.isClient()) {
Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id());
@@ -866,7 +877,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
- hnd,
+ hnd0,
data.bufferSize(),
data.interval(),
data.autoUnsubscribe()));
@@ -881,10 +892,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (prjPred != null)
ctx.resource().injectGeneric(prjPred);
- if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) {
- registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(),
+ if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) &&
+ !locInfos.containsKey(routineId)) {
+ registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
data.autoUnsubscribe(), false);
}
+
+ if (!data.autoUnsubscribe())
+ // Register routine locally.
+ locInfos.putIfAbsent(routineId, new LocalRoutineInfo(
+ prjPred, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
}
catch (IgniteCheckedException e) {
err = e;
@@ -894,11 +911,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
// Load partition counters.
- if (hnd.isQuery()) {
+ if (hnd0.isQuery()) {
GridCacheProcessor proc = ctx.cache();
if (proc != null) {
- GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
+ GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
if (cache != null && !cache.isLocal()) {
Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
@@ -912,7 +929,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
req.addError(ctx.localNodeId(), err);
if (registered)
- hnd.onListenerRegistered(routineId, ctx);
+ hnd0.onListenerRegistered(routineId, ctx);
}
/**
@@ -1095,22 +1112,31 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
*/
@SuppressWarnings("TooBroadScope")
private void unregisterRemote(UUID routineId) {
- RemoteRoutineInfo info;
+ RemoteRoutineInfo remote;
+ LocalRoutineInfo loc;
stopLock.lock();
try {
- info = rmtInfos.remove(routineId);
+ remote = rmtInfos.remove(routineId);
- if (info == null)
+ loc = locInfos.remove(routineId);
+
+ if (remote == null)
stopped.add(routineId);
}
finally {
stopLock.unlock();
}
- if (info != null)
- unregisterHandler(routineId, info.hnd, false);
+ if (remote != null)
+ unregisterHandler(routineId, remote.hnd, false);
+ else {
+ assert loc != null;
+
+ // Removes routine at node started it when stopRoutine called from another node.
+ unregisterHandler(routineId, loc.hnd, false);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index dc94c96..4c44adc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -113,7 +113,21 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
/**
* @throws Exception If failed.
*/
- public void testMessageListenerReconnect() throws Exception {
+ public void testMessageListenerReconnectAndStopFromServer() throws Exception {
+ testMessageListenerReconnect(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMessageListenerReconnectAndStopFromClient() throws Exception {
+ testMessageListenerReconnect(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testMessageListenerReconnect(boolean stopFromClient) throws Exception {
Ignite client = grid(serverCount());
assertTrue(client.cluster().localNode().isClient());
@@ -166,7 +180,7 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
log.info("Stop listen, should not get remote messages anymore.");
- client.message().stopRemoteListen(opId);
+ (stopFromClient ? client : srv).message().stopRemoteListen(opId);
srv.message().send(topic, "msg3");
@@ -175,6 +189,20 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
assertFalse(latch.await(3000, MILLISECONDS));
+
+ log.info("New nodes should not register stopped listeners.");
+
+ startGrid(serverCount() + 1);
+
+ srv.message().send(topic, "msg4");
+
+ locLsnr.latch = new CountDownLatch(1);
+ latch = new CountDownLatch(1);
+
+ assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+ assertFalse(latch.await(3000, MILLISECONDS));
+
+ stopGrid(serverCount() + 1);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
new file mode 100644
index 0000000..b1d8a49
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryReconnectTest extends GridCommonAbstractTest implements Serializable {
+ /** */
+ final private static AtomicInteger cnt = new AtomicInteger();
+
+ /** */
+ private volatile boolean isClient = false;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setAtomicityMode(atomicMode());
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(1);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ if (isClient)
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @return Atomic mode.
+ */
+ protected CacheAtomicityMode atomicMode() {
+ return ATOMIC;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectServer() throws Exception {
+ testReconnect(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectClient() throws Exception {
+ testReconnect(true);
+ }
+
+ /**
+ *
+ */
+ private void putAndCheck(IgniteCache<Object, Object> cache, int diff) {
+ cnt.set(0);
+
+ cache.put(1, "1");
+
+ assertEquals(diff, cnt.get());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testReconnect(boolean clientQuery) throws Exception {
+ Ignite srv1 = startGrid(0);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ // No-op.
+ }
+ });
+
+ qry.setAutoUnsubscribe(false);
+
+ qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Object, Object>() {
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+ cnt.incrementAndGet();
+
+ return true;
+ }
+ });
+
+ isClient = true;
+
+ Ignite client = startGrid(1);
+
+ isClient = false;
+
+ IgniteCache<Object, Object> cache1 = srv1.cache(null);
+ IgniteCache<Object, Object> clCache = client.cache(null);
+
+ putAndCheck(clCache, 0); // 0 remote listeners.
+
+ QueryCursor<Cache.Entry<Object, Object>> cur = (clientQuery ? clCache : cache1).query(qry);
+
+ putAndCheck(clCache, 1); // 1 remote listener.
+
+ final Ignite srv2 = startGrid(2);
+
+ putAndCheck(clCache, 2); // 2 remote listeners.
+
+ stopGrid(0);
+
+ while (true) {
+ try {
+ clCache.get(1);
+
+ break;
+ }
+ catch (IgniteClientDisconnectedException e) {
+ e.reconnectFuture().get(); // Wait for reconnect.
+
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof IgniteClientDisconnectedException)
+ ((IgniteClientDisconnectedException)e.getCause()).reconnectFuture().get(); // Wait for reconnect.
+ }
+ }
+
+ putAndCheck(clCache, 1); // 1 remote listener.
+
+ Ignite srv3 = startGrid(3);
+
+ putAndCheck(clCache, 2); // 2 remote listeners.
+
+ stopGrid(1); // Client node.
+
+ isClient = true;
+
+ client = startGrid(4);
+
+ isClient = false;
+
+ clCache = client.cache(null);
+
+ putAndCheck(clCache, 2); // 2 remote listeners.
+
+ Ignite srv4 = startGrid(5);
+
+ putAndCheck(clCache, 3); // 3 remote listeners.
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 030c653..7debb41 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -2050,7 +2050,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public boolean apply(UUID uuid, Object msg) {
- X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
+ X.println(">>> Received [node=" + ignite.name() + ", msg=" + msg + ']');
msgLatch.countDown();
http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 3cd4579..cecb8ad 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -100,6 +100,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryReconnectTest;
import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest;
import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest;
@@ -200,6 +201,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheContinuousQueryPartitionTxOneNodeTest.class);
suite.addTestSuite(GridCacheContinuousQueryPartitionAtomicOneNodeTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
+ suite.addTestSuite(IgniteCacheContinuousQueryReconnectTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class);
suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.class);