You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/10 12:10:10 UTC
ignite git commit: ignite-3994 GridContinuousHandler cleanup on
client disconnect. This closes #1496.
Repository: ignite
Updated Branches:
refs/heads/master 543a65fba -> 2f4bdbb67
ignite-3994 GridContinuousHandler cleanup on client disconnect.
This closes #1496.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2f4bdbb6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2f4bdbb6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2f4bdbb6
Branch: refs/heads/master
Commit: 2f4bdbb674e5634ce4c1a3432dede4c865977fde
Parents: 543a65f
Author: vdpyatkov <vp...@gridgain.com>
Authored: Fri Feb 10 15:08:45 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 10 15:09:53 2017 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 5 +
.../internal/GridMessageListenHandler.java | 5 +
.../continuous/CacheContinuousQueryHandler.java | 16 ++
.../continuous/GridContinuousHandler.java | 5 +
.../continuous/GridContinuousProcessor.java | 3 +
.../ClientReconnectContinuousQueryTest.java | 201 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite3.java | 2 +
7 files changed, 237 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 68d34ce..0395434 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -392,6 +392,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public void onClientDisconnected() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 0eeaa8a..88d4450 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -199,6 +199,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public void onClientDisconnected() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index a9a7d7c..b3f0684 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -854,6 +854,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null;
}
+ /** {@inheritDoc} */
+ @Override public void onClientDisconnected() {
+ if (internal)
+ return;
+
+ for (PartitionRecovery rec : rcvs.values())
+ rec.resetTopologyCache();
+ }
+
/**
* @param ctx Context.
* @param partId Partition id.
@@ -972,6 +981,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
}
/**
+ * Resets cached topology.
+ */
+ void resetTopologyCache() {
+ curTop = AffinityTopologyVersion.NONE;
+ }
+
+ /**
* Add continuous entry.
*
* @param cctx Cache context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index f14b450..2a3a052 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -99,6 +99,11 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
public GridContinuousBatch createBatch();
/**
+ * Client node disconnected callback.
+ */
+ public void onClientDisconnected();
+
+ /**
* Called when ack for a batch is received from client.
*
* @param routineId Routine ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 9fd9b6d..b9f42e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -912,6 +912,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
unregisterRemote(e.getKey());
}
+ for (LocalRoutineInfo routine : locInfos.values())
+ routine.hnd.onClientDisconnected();
+
rmtInfos.clear();
clientInfos.clear();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
new file mode 100644
index 0000000..feded14
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class ClientReconnectContinuousQueryTest extends GridCommonAbstractTest {
+ /** Client index. */
+ private static final int CLIENT_IDX = 1;
+
+ /** Puts before reconnect. */
+ private static final int PUTS_BEFORE_RECONNECT = 50;
+
+ /** Puts after reconnect. */
+ private static final int PUTS_AFTER_RECONNECT = 50;
+
+ /** Recon latch. */
+ private static final CountDownLatch reconLatch = new CountDownLatch(1);
+
+ /** Discon latch. */
+ private static final CountDownLatch disconLatch = new CountDownLatch(1);
+
+ /** Updater received. */
+ private static final CountDownLatch updaterReceived = new CountDownLatch(PUTS_BEFORE_RECONNECT);
+
+ /** Receiver after reconnect. */
+ private static final CountDownLatch receiverAfterReconnect = new CountDownLatch(PUTS_AFTER_RECONNECT);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+
+ commSpi.setSlowClientQueueLimit(50);
+ commSpi.setIdleConnectionTimeout(300_000);
+
+ if (getTestGridName(CLIENT_IDX).equals(gridName))
+ cfg.setClientMode(true);
+ else {
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+
+ cfg.setCacheConfiguration(ccfg);
+ }
+
+ return cfg;
+ }
+
+ /**
+ * Test client reconnect to alive grid.
+ *
+ * @throws Exception If failed.
+ */
+ public void testClientReconnect() throws Exception {
+ try {
+ startGrids(2);
+
+ IgniteEx client = grid(CLIENT_IDX);
+
+ client.events().localListen(new DisconnectListener(), EventType.EVT_CLIENT_NODE_DISCONNECTED);
+
+ client.events().localListen(new ReconnectListener(), EventType.EVT_CLIENT_NODE_RECONNECTED);
+
+ IgniteCache cache = client.cache(null);
+
+ ContinuousQuery qry = new ContinuousQuery();
+
+ qry.setLocalListener(new CQListener());
+
+ cache.query(qry);
+
+ putSomeKeys(PUTS_BEFORE_RECONNECT);
+
+ info("updaterReceived Count: " + updaterReceived.getCount());
+
+ assertTrue(updaterReceived.await(10_000, TimeUnit.MILLISECONDS));
+
+ skipRead(client, true);
+
+ putSomeKeys(1_000);
+
+ assertTrue(disconLatch.await(10_000, TimeUnit.MILLISECONDS));
+
+ skipRead(client, false);
+
+ assertTrue(reconLatch.await(10_000, TimeUnit.MILLISECONDS));
+
+ putSomeKeys(PUTS_AFTER_RECONNECT);
+
+ info("receiverAfterReconnect Count: " + receiverAfterReconnect.getCount());
+
+ assertTrue(receiverAfterReconnect.await(10_000, TimeUnit.MILLISECONDS));
+ }
+ finally {
+ stopAllGrids();
+ }
+
+ }
+
+ /**
+ *
+ */
+ private static class ReconnectListener implements IgnitePredicate<Event> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(Event evt) {
+ reconLatch.countDown();
+
+ return false;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class DisconnectListener implements IgnitePredicate<Event> {
+ /** {@inheritDoc} */
+ @Override public boolean apply(Event evt) {
+ disconLatch.countDown();
+
+ return false;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CQListener implements CacheEntryUpdatedListener {
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ if (reconLatch.getCount() != 0) {
+ for (Object o : iterable)
+ updaterReceived.countDown();
+ }
+ else {
+ for (Object o : iterable)
+ receiverAfterReconnect.countDown();
+ }
+ }
+ }
+
+ /**
+ * @param cnt Number of keys.
+ */
+ private void putSomeKeys(int cnt) {
+ IgniteEx ignite = grid(0);
+
+ IgniteCache<Object, Object> srvCache = ignite.cache(null);
+
+ for (int i = 0; i < cnt; i++)
+ srvCache.put(0, i);
+ }
+
+ /**
+ * @param igniteClient Ignite client.
+ * @param skip Skip.
+ */
+ private void skipRead(IgniteEx igniteClient, boolean skip) {
+ GridIoManager ioMgr = igniteClient.context().io();
+
+ TcpCommunicationSpi commSpi = (TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0];
+
+ GridNioServer nioSrvr = U.field(commSpi, "nioSrvr");
+
+ GridTestUtils.setFieldValue(nioSrvr, "skipRead", skip);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index a865788..07125a6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBin
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationSwapEnabledTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.ClientReconnectContinuousQueryTest;
import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryRemoteFilterMissingInClassPathSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
@@ -123,6 +124,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryBackupQueueTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryNoUnsubscribeTest.class);
+ suite.addTestSuite(ClientReconnectContinuousQueryTest.class);
return suite;
}