You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/11/17 12:53:31 UTC

ignite git commit: IGNITE-6496 Client node should release queue semaphore on disconnect - Fixes #2981.

Repository: ignite
Updated Branches:
  refs/heads/master 38f66c7f7 -> 4e046794c


IGNITE-6496 Client node should release queue semaphore on disconnect - Fixes #2981.

Signed-off-by: Alexey Goncharuk <al...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e046794
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e046794
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e046794

Branch: refs/heads/master
Commit: 4e046794c5a0274241c1240d73b9f08cf0a83439
Parents: 38f66c7
Author: Slava Koptilin <sl...@gmail.com>
Authored: Fri Nov 17 15:53:15 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Nov 17 15:53:15 2017 +0300

----------------------------------------------------------------------
 .../CacheDataStructuresManager.java             |  12 ++
 .../datastructures/GridCacheQueueAdapter.java   |   5 +
 .../GridCacheQueueClientDisconnectTest.java     | 117 +++++++++++++++++++
 .../IgniteCacheDataStructuresSelfTestSuite.java |   2 +
 4 files changed, 136 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4e046794/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 0b81e7a..a9b60228 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.jetbrains.annotations.NotNull;
@@ -161,6 +162,17 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
             q.delegate().onKernalStop();
     }
 
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture reconnectFut) {
+        super.onDisconnected(reconnectFut);
+
+        for (Map.Entry<IgniteUuid, GridCacheQueueProxy> e : queuesMap.entrySet()) {
+            GridCacheQueueProxy queue = e.getValue();
+
+            queue.delegate().onClientDisconnected();
+        }
+    }
+
     /**
      * @param set Set.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e046794/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index c567ac4..e42c00b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -479,6 +479,11 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
             onRemoved(true);
     }
 
+    /** Release all semaphores used in blocking operations in case of client disconnect. */
+    public void onClientDisconnected() {
+        releaseSemaphores();
+    }
+
     /**
      * Marks queue as removed.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e046794/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueClientDisconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueClientDisconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueClientDisconnectTest.java
new file mode 100644
index 0000000..ed54377
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueClientDisconnectTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+public class GridCacheQueueClientDisconnectTest extends GridCommonAbstractTest {
+    /** */
+    private static final String IGNITE_QUEUE_NAME = "ignite-queue-client-reconnect-test";
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int FAILURE_DETECTION_TIMEOUT = 10_000;
+
+    /** */
+    private boolean clientMode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        spi.setClientReconnectDisabled(false);
+
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
+        cfg.setClientFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
+
+        if (clientMode)
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    private static CollectionConfiguration collectionConfiguration(CacheAtomicityMode cacheAtomicityMode) {
+        CollectionConfiguration colCfg = new CollectionConfiguration();
+
+        colCfg.setAtomicityMode(cacheAtomicityMode);
+
+        return colCfg;
+    }
+
+    public void testClientDisconnect() throws Exception {
+        try {
+            Ignite server = startGrid(0);
+
+            clientMode = true;
+
+            Ignite client = startGrid(1);
+
+            awaitPartitionMapExchange();
+
+            final IgniteQueue queue = client.queue(
+                IGNITE_QUEUE_NAME, 0, collectionConfiguration(CacheAtomicityMode.ATOMIC));
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            GridTestUtils.runAsync(new Runnable() {
+                @Override public void run() {
+                    try {
+                        Object value = queue.take();
+                    }
+                    catch (IgniteClientDisconnectedException icd) {
+                        latch.countDown();
+                    }
+                    catch (Exception e) {
+                    }
+                }
+            });
+
+            U.sleep(5000);
+
+            server.close();
+
+            boolean countReachedZero = latch.await(FAILURE_DETECTION_TIMEOUT * 2, TimeUnit.MILLISECONDS);
+
+            assertTrue("IgniteClientDisconnectedException was not thrown", countReachedZero);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e046794/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
index 6e16d2e..414f463 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.AtomicCacheAffinityConfigurationTest;
 import org.apache.ignite.internal.processors.cache.datastructures.GridCacheQueueCleanupSelfTest;
+import org.apache.ignite.internal.processors.cache.datastructures.GridCacheQueueClientDisconnectTest;
 import org.apache.ignite.internal.processors.cache.datastructures.GridCacheQueueMultiNodeConsistencySelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDataStructuresTest;
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDiscoveryDataStructuresTest;
@@ -121,6 +122,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueApiSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedQueueMultiNodeSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueMultiNodeSelfTest.class));
+        suite.addTest(new TestSuite(GridCacheQueueClientDisconnectTest.class));
 
         suite.addTest(new TestSuite(GridCachePartitionedQueueCreateMultiNodeSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.class));