You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/02/13 11:30:14 UTC

[34/40] ignite git commit: ignite-3994 GridContinuousHandler cleanup on client disconnect. This closes #1496.

ignite-3994 GridContinuousHandler cleanup on client disconnect.
This closes #1496.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2f4bdbb6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2f4bdbb6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2f4bdbb6

Branch: refs/heads/ignite-2.0
Commit: 2f4bdbb674e5634ce4c1a3432dede4c865977fde
Parents: 543a65f
Author: vdpyatkov <vp...@gridgain.com>
Authored: Fri Feb 10 15:08:45 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Feb 10 15:09:53 2017 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   5 +
 .../internal/GridMessageListenHandler.java      |   5 +
 .../continuous/CacheContinuousQueryHandler.java |  16 ++
 .../continuous/GridContinuousHandler.java       |   5 +
 .../continuous/GridContinuousProcessor.java     |   3 +
 .../ClientReconnectContinuousQueryTest.java     | 201 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 7 files changed, 237 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 68d34ce..0395434 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -392,6 +392,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public void onClientDisconnected() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 0eeaa8a..88d4450 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -199,6 +199,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public void onClientDisconnected() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index a9a7d7c..b3f0684 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -854,6 +854,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null;
     }
 
+    /** {@inheritDoc} */
+    @Override public void onClientDisconnected() {
+        if (internal)
+            return;
+
+        for (PartitionRecovery rec : rcvs.values())
+            rec.resetTopologyCache();
+    }
+
     /**
      * @param ctx Context.
      * @param partId Partition id.
@@ -972,6 +981,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         }
 
         /**
+         * Resets cached topology.
+         */
+        void resetTopologyCache() {
+            curTop = AffinityTopologyVersion.NONE;
+        }
+
+        /**
          * Add continuous entry.
          *
          * @param cctx Cache context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index f14b450..2a3a052 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -99,6 +99,11 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
     public GridContinuousBatch createBatch();
 
     /**
+     * Client node disconnected callback.
+     */
+    public void onClientDisconnected();
+
+    /**
      * Called when ack for a batch is received from client.
      *
      * @param routineId Routine ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 9fd9b6d..b9f42e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -912,6 +912,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 unregisterRemote(e.getKey());
         }
 
+        for (LocalRoutineInfo routine : locInfos.values())
+            routine.hnd.onClientDisconnected();
+
         rmtInfos.clear();
 
         clientInfos.clear();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
new file mode 100644
index 0000000..feded14
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class ClientReconnectContinuousQueryTest extends GridCommonAbstractTest {
+    /** Client index. */
+    private static final int CLIENT_IDX = 1;
+
+    /** Puts before reconnect. */
+    private static final int PUTS_BEFORE_RECONNECT = 50;
+
+    /** Puts after reconnect. */
+    private static final int PUTS_AFTER_RECONNECT = 50;
+
+    /** Recon latch. */
+    private static final CountDownLatch reconLatch = new CountDownLatch(1);
+
+    /** Discon latch. */
+    private static final CountDownLatch disconLatch = new CountDownLatch(1);
+
+    /** Updater received. */
+    private static final CountDownLatch updaterReceived = new CountDownLatch(PUTS_BEFORE_RECONNECT);
+
+    /** Receiver after reconnect. */
+    private static final CountDownLatch receiverAfterReconnect = new CountDownLatch(PUTS_AFTER_RECONNECT);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+
+        commSpi.setSlowClientQueueLimit(50);
+        commSpi.setIdleConnectionTimeout(300_000);
+
+        if (getTestGridName(CLIENT_IDX).equals(gridName))
+            cfg.setClientMode(true);
+        else {
+            CacheConfiguration ccfg = defaultCacheConfiguration();
+
+            cfg.setCacheConfiguration(ccfg);
+        }
+
+        return cfg;
+    }
+
+    /**
+     * Test client reconnect to alive grid.
+     *
+     * @throws Exception If failed.
+     */
+    public void testClientReconnect() throws Exception {
+        try {
+            startGrids(2);
+
+            IgniteEx client = grid(CLIENT_IDX);
+
+            client.events().localListen(new DisconnectListener(), EventType.EVT_CLIENT_NODE_DISCONNECTED);
+
+            client.events().localListen(new ReconnectListener(), EventType.EVT_CLIENT_NODE_RECONNECTED);
+
+            IgniteCache cache = client.cache(null);
+
+            ContinuousQuery qry = new ContinuousQuery();
+
+            qry.setLocalListener(new CQListener());
+
+            cache.query(qry);
+
+            putSomeKeys(PUTS_BEFORE_RECONNECT);
+
+            info("updaterReceived Count: " + updaterReceived.getCount());
+
+            assertTrue(updaterReceived.await(10_000, TimeUnit.MILLISECONDS));
+
+            skipRead(client, true);
+
+            putSomeKeys(1_000);
+
+            assertTrue(disconLatch.await(10_000, TimeUnit.MILLISECONDS));
+
+            skipRead(client, false);
+
+            assertTrue(reconLatch.await(10_000, TimeUnit.MILLISECONDS));
+
+            putSomeKeys(PUTS_AFTER_RECONNECT);
+
+            info("receiverAfterReconnect Count: " + receiverAfterReconnect.getCount());
+
+            assertTrue(receiverAfterReconnect.await(10_000, TimeUnit.MILLISECONDS));
+        }
+        finally {
+            stopAllGrids();
+        }
+
+    }
+
+    /**
+     *
+     */
+    private static class ReconnectListener implements IgnitePredicate<Event> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(Event evt) {
+            reconLatch.countDown();
+
+            return false;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class DisconnectListener implements IgnitePredicate<Event> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(Event evt) {
+            disconLatch.countDown();
+
+            return false;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CQListener implements CacheEntryUpdatedListener {
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+            if (reconLatch.getCount() != 0) {
+                for (Object o : iterable)
+                    updaterReceived.countDown();
+            }
+            else {
+                for (Object o : iterable)
+                    receiverAfterReconnect.countDown();
+            }
+        }
+    }
+
+    /**
+     * @param cnt Number of keys.
+     */
+    private void putSomeKeys(int cnt) {
+        IgniteEx ignite = grid(0);
+
+        IgniteCache<Object, Object> srvCache = ignite.cache(null);
+
+        for (int i = 0; i < cnt; i++)
+            srvCache.put(0, i);
+    }
+
+    /**
+     * @param igniteClient Ignite client.
+     * @param skip Skip.
+     */
+    private void skipRead(IgniteEx igniteClient, boolean skip) {
+        GridIoManager ioMgr = igniteClient.context().io();
+
+        TcpCommunicationSpi commSpi = (TcpCommunicationSpi)((Object[])U.field(ioMgr, "spis"))[0];
+
+        GridNioServer nioSrvr = U.field(commSpi, "nioSrvr");
+
+        GridTestUtils.setFieldValue(nioSrvr, "skipRead", skip);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f4bdbb6/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index a865788..07125a6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBin
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationSwapEnabledTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.ClientReconnectContinuousQueryTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryRemoteFilterMissingInClassPathSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
@@ -123,6 +124,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
         suite.addTestSuite(CacheKeepBinaryIterationNearEnabledTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryBackupQueueTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryNoUnsubscribeTest.class);
+        suite.addTestSuite(ClientReconnectContinuousQueryTest.class);
 
         return suite;
     }