You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/11/17 12:53:31 UTC
ignite git commit: IGNITE-6496 Client node should release queue
semaphore on disconnect - Fixes #2981.
Repository: ignite
Updated Branches:
refs/heads/master 38f66c7f7 -> 4e046794c
IGNITE-6496 Client node should release queue semaphore on disconnect - Fixes #2981.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e046794
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e046794
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e046794
Branch: refs/heads/master
Commit: 4e046794c5a0274241c1240d73b9f08cf0a83439
Parents: 38f66c7
Author: Slava Koptilin <sl...@gmail.com>
Authored: Fri Nov 17 15:53:15 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Nov 17 15:53:15 2017 +0300
----------------------------------------------------------------------
.../CacheDataStructuresManager.java | 12 ++
.../datastructures/GridCacheQueueAdapter.java | 5 +
.../GridCacheQueueClientDisconnectTest.java | 117 +++++++++++++++++++
.../IgniteCacheDataStructuresSelfTestSuite.java | 2 +
4 files changed, 136 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e046794/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 0b81e7a..a9b60228 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.NotNull;
@@ -161,6 +162,17 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
q.delegate().onKernalStop();
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture reconnectFut) {
+ super.onDisconnected(reconnectFut);
+
+ for (Map.Entry<IgniteUuid, GridCacheQueueProxy> e : queuesMap.entrySet()) {
+ GridCacheQueueProxy queue = e.getValue();
+
+ queue.delegate().onClientDisconnected();
+ }
+ }
+
/**
* @param set Set.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e046794/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index c567ac4..e42c00b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -479,6 +479,11 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
onRemoved(true);
}
+ /** Release all semaphores used in blocking operations in case of client disconnect. */
+ public void onClientDisconnected() {
+ releaseSemaphores();
+ }
+
/**
* Marks queue as removed.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e046794/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueClientDisconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueClientDisconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueClientDisconnectTest.java
new file mode 100644
index 0000000..ed54377
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueClientDisconnectTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+public class GridCacheQueueClientDisconnectTest extends GridCommonAbstractTest {
+ /** */
+ private static final String IGNITE_QUEUE_NAME = "ignite-queue-client-reconnect-test";
+
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int FAILURE_DETECTION_TIMEOUT = 10_000;
+
+ /** */
+ private boolean clientMode;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+ spi.setIpFinder(ipFinder);
+
+ spi.setClientReconnectDisabled(false);
+
+ cfg.setDiscoverySpi(spi);
+
+ cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
+ cfg.setClientFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
+
+ if (clientMode)
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ private static CollectionConfiguration collectionConfiguration(CacheAtomicityMode cacheAtomicityMode) {
+ CollectionConfiguration colCfg = new CollectionConfiguration();
+
+ colCfg.setAtomicityMode(cacheAtomicityMode);
+
+ return colCfg;
+ }
+
+ public void testClientDisconnect() throws Exception {
+ try {
+ Ignite server = startGrid(0);
+
+ clientMode = true;
+
+ Ignite client = startGrid(1);
+
+ awaitPartitionMapExchange();
+
+ final IgniteQueue queue = client.queue(
+ IGNITE_QUEUE_NAME, 0, collectionConfiguration(CacheAtomicityMode.ATOMIC));
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ Object value = queue.take();
+ }
+ catch (IgniteClientDisconnectedException icd) {
+ latch.countDown();
+ }
+ catch (Exception e) {
+ }
+ }
+ });
+
+ U.sleep(5000);
+
+ server.close();
+
+ boolean countReachedZero = latch.await(FAILURE_DETECTION_TIMEOUT * 2, TimeUnit.MILLISECONDS);
+
+ assertTrue("IgniteClientDisconnectedException was not thrown", countReachedZero);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e046794/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
index 6e16d2e..414f463 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.AtomicCacheAffinityConfigurationTest;
import org.apache.ignite.internal.processors.cache.datastructures.GridCacheQueueCleanupSelfTest;
+import org.apache.ignite.internal.processors.cache.datastructures.GridCacheQueueClientDisconnectTest;
import org.apache.ignite.internal.processors.cache.datastructures.GridCacheQueueMultiNodeConsistencySelfTest;
import org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDataStructuresTest;
import org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDiscoveryDataStructuresTest;
@@ -121,6 +122,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueApiSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedQueueMultiNodeSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueMultiNodeSelfTest.class));
+ suite.addTest(new TestSuite(GridCacheQueueClientDisconnectTest.class));
suite.addTest(new TestSuite(GridCachePartitionedQueueCreateMultiNodeSelfTest.class));
suite.addTest(new TestSuite(GridCachePartitionedAtomicQueueCreateMultiNodeSelfTest.class));