You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/03/13 10:36:32 UTC
[ignite] branch master updated: IGNITE-12743 Java thin client:
Fixed thread shutdown on client close when partition awareness is enabled -
Fixes #7522.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ef4f67e IGNITE-12743 Java thin client: Fixed thread shutdown on client close when partition awareness is enabled - Fixes #7522.
ef4f67e is described below
commit ef4f67e351402bf3d01b42c040bffc081b1c4995
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Fri Mar 13 13:17:06 2020 +0300
IGNITE-12743 Java thin client: Fixed thread shutdown on client close when partition awareness is enabled - Fixes #7522.
---
.../internal/client/thin/ReliableChannel.java | 28 ++++++--
.../ThinClientAbstractPartitionAwarenessTest.java | 22 +++++++
...lientPartitionAwarenessResourceReleaseTest.java | 74 ++++++++++++++++++++++
.../org/apache/ignite/client/ClientTestSuite.java | 4 +-
4 files changed, 123 insertions(+), 5 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index e09587b..9ce6ad985 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -48,6 +49,12 @@ import org.jetbrains.annotations.NotNull;
* Communication channel with failover and partition awareness.
*/
final class ReliableChannel implements AutoCloseable {
+ /** Timeout to wait for executor service to shutdown (in milliseconds). */
+ private static final long EXECUTOR_SHUTDOWN_TIMEOUT = 10_000L;
+
+ /** Async runner thread name. */
+ static final String ASYNC_RUNNER_THREAD_NAME = "thin-client-channel-async-runner";
+
/** Channel factory. */
private final Function<ClientChannelConfiguration, ClientChannel> chFactory;
@@ -70,7 +77,11 @@ final class ReliableChannel implements AutoCloseable {
private final ExecutorService asyncRunner = Executors.newSingleThreadExecutor(
new ThreadFactory() {
@Override public Thread newThread(@NotNull Runnable r) {
- return new Thread(r, "thin-client-channel-async-runner");
+ Thread thread = new Thread(r, ASYNC_RUNNER_THREAD_NAME);
+
+ thread.setDaemon(true);
+
+ return thread;
}
}
);
@@ -82,7 +93,7 @@ final class ReliableChannel implements AutoCloseable {
private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
/** Channel is closed. */
- private boolean closed;
+ private volatile boolean closed;
/**
* Constructor.
@@ -137,6 +148,15 @@ final class ReliableChannel implements AutoCloseable {
@Override public synchronized void close() {
closed = true;
+ asyncRunner.shutdown();
+
+ try {
+ asyncRunner.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+
for (ClientChannelHolder hld : channels)
hld.closeChannel();
}
@@ -363,8 +383,8 @@ final class ReliableChannel implements AutoCloseable {
scheduledChannelsReinit.set(false);
for (ClientChannelHolder hld : channels) {
- if (scheduledChannelsReinit.get())
- return; // New reinit task scheduled.
+ if (scheduledChannelsReinit.get() || closed)
+ return; // New reinit task scheduled or channel is closed.
try {
hld.getOrCreateChannel(true);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index 04c961c..8d335f8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -106,6 +107,10 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
super.afterTest();
opsQueue.clear();
+
+ U.closeQuiet(client);
+
+ client = null;
}
/**
@@ -286,6 +291,9 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
/** Channel configuration. */
private final ClientChannelConfiguration cfg;
+ /** Channel is closed. */
+ private volatile boolean closed;
+
/**
* @param cfg Config.
*/
@@ -315,6 +323,20 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
}
/** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ super.close();
+
+ closed = true;
+ }
+
+ /**
+ * Channel is closed.
+ */
+ public boolean isClosed() {
+ return closed;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return cfg.getAddress().toString();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
new file mode 100644
index 0000000..281be74e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.lang.management.ThreadInfo;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.client.thin.ReliableChannel.ASYNC_RUNNER_THREAD_NAME;
+
+/**
+ * Test resource releasing by thin client.
+ */
+public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientAbstractPartitionAwarenessTest {
+ /**
+ * Test that resources are correctly released after closing client with partition awareness.
+ */
+ @Test
+ public void testResourcesReleasedAfterClientClosed() throws Exception {
+ startGrids(2);
+
+ initClient(getClientConfiguration(0, 1), 0, 1);
+
+ ClientCache<Integer, Integer> cache = client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ assertFalse(channels[0].isClosed());
+ assertFalse(channels[1].isClosed());
+ assertEquals(1, threadsCount(ASYNC_RUNNER_THREAD_NAME));
+
+ client.close();
+
+ assertTrue(channels[0].isClosed());
+ assertTrue(channels[1].isClosed());
+ assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(ASYNC_RUNNER_THREAD_NAME) == 0, 1_000L));
+ }
+
+ /**
+ * Gets threads count with a given name.
+ */
+ private static int threadsCount(String name) {
+ int cnt = 0;
+
+ long[] threadIds = U.getThreadMx().getAllThreadIds();
+
+ for (long id : threadIds) {
+ ThreadInfo info = U.getThreadMx().getThreadInfo(id);
+
+ if (info != null && info.getThreadState() != Thread.State.TERMINATED && name.equals(info.getThreadName()))
+ cnt++;
+ }
+
+ return cnt;
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 0c6f92e..f0ac9a8 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -17,6 +17,7 @@
package org.apache.ignite.client;
+import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessUnstableTopologyTest;
import org.junit.runner.RunWith;
@@ -41,7 +42,8 @@ import org.junit.runners.Suite;
ConnectToStartingNodeTest.class,
AsyncChannelTest.class,
ThinClientPartitionAwarenessStableTopologyTest.class,
- ThinClientPartitionAwarenessUnstableTopologyTest.class
+ ThinClientPartitionAwarenessUnstableTopologyTest.class,
+ ThinClientPartitionAwarenessResourceReleaseTest.class
})
public class ClientTestSuite {
// No-op.