You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2020/03/13 10:36:32 UTC

[ignite] branch master updated: IGNITE-12743 Java thin client: Fixed thread shutdown on client close when partition awareness is enabled - Fixes #7522.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ef4f67e  IGNITE-12743 Java thin client: Fixed thread shutdown on client close when partition awareness is enabled - Fixes #7522.
ef4f67e is described below

commit ef4f67e351402bf3d01b42c040bffc081b1c4995
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Fri Mar 13 13:17:06 2020 +0300

    IGNITE-12743 Java thin client: Fixed thread shutdown on client close when partition awareness is enabled - Fixes #7522.
---
 .../internal/client/thin/ReliableChannel.java      | 28 ++++++--
 .../ThinClientAbstractPartitionAwarenessTest.java  | 22 +++++++
 ...lientPartitionAwarenessResourceReleaseTest.java | 74 ++++++++++++++++++++++
 .../org/apache/ignite/client/ClientTestSuite.java  |  4 +-
 4 files changed, 123 insertions(+), 5 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index e09587b..9ce6ad985 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -48,6 +49,12 @@ import org.jetbrains.annotations.NotNull;
  * Communication channel with failover and partition awareness.
  */
 final class ReliableChannel implements AutoCloseable {
+    /** Timeout to wait for executor service to shutdown (in milliseconds). */
+    private static final long EXECUTOR_SHUTDOWN_TIMEOUT = 10_000L;
+
+    /** Async runner thread name. */
+    static final String ASYNC_RUNNER_THREAD_NAME = "thin-client-channel-async-runner";
+
     /** Channel factory. */
     private final Function<ClientChannelConfiguration, ClientChannel> chFactory;
 
@@ -70,7 +77,11 @@ final class ReliableChannel implements AutoCloseable {
     private final ExecutorService asyncRunner = Executors.newSingleThreadExecutor(
         new ThreadFactory() {
             @Override public Thread newThread(@NotNull Runnable r) {
-                return new Thread(r, "thin-client-channel-async-runner");
+                Thread thread = new Thread(r, ASYNC_RUNNER_THREAD_NAME);
+
+                thread.setDaemon(true);
+
+                return thread;
             }
         }
     );
@@ -82,7 +93,7 @@ final class ReliableChannel implements AutoCloseable {
     private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
 
     /** Channel is closed. */
-    private boolean closed;
+    private volatile boolean closed;
 
     /**
      * Constructor.
@@ -137,6 +148,15 @@ final class ReliableChannel implements AutoCloseable {
     @Override public synchronized void close() {
         closed = true;
 
+        asyncRunner.shutdown();
+
+        try {
+            asyncRunner.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException ignore) {
+            // No-op.
+        }
+
         for (ClientChannelHolder hld : channels)
             hld.closeChannel();
     }
@@ -363,8 +383,8 @@ final class ReliableChannel implements AutoCloseable {
                     scheduledChannelsReinit.set(false);
 
                     for (ClientChannelHolder hld : channels) {
-                        if (scheduledChannelsReinit.get())
-                            return; // New reinit task scheduled.
+                        if (scheduledChannelsReinit.get() || closed)
+                            return; // New reinit task scheduled or channel is closed.
 
                         try {
                             hld.getOrCreateChannel(true);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index 04c961c..8d335f8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -106,6 +107,10 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
         super.afterTest();
 
         opsQueue.clear();
+
+        U.closeQuiet(client);
+
+        client = null;
     }
 
     /**
@@ -286,6 +291,9 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
         /** Channel configuration. */
         private final ClientChannelConfiguration cfg;
 
+        /** Channel is closed. */
+        private volatile boolean closed;
+
         /**
          * @param cfg Config.
          */
@@ -315,6 +323,20 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
         }
 
         /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+            super.close();
+
+            closed = true;
+        }
+
+        /**
+         * Channel is closed.
+         */
+        public boolean isClosed() {
+            return closed;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return cfg.getAddress().toString();
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
new file mode 100644
index 0000000..281be74e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.lang.management.ThreadInfo;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.client.thin.ReliableChannel.ASYNC_RUNNER_THREAD_NAME;
+
+/**
+ * Test resource releasing by thin client.
+ */
+public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientAbstractPartitionAwarenessTest {
+    /**
+     * Test that resources are correctly released after closing client with partition awareness.
+     */
+    @Test
+    public void testResourcesReleasedAfterClientClosed() throws Exception {
+        startGrids(2);
+
+        initClient(getClientConfiguration(0, 1), 0, 1);
+
+        ClientCache<Integer, Integer> cache = client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, i);
+
+        assertFalse(channels[0].isClosed());
+        assertFalse(channels[1].isClosed());
+        assertEquals(1, threadsCount(ASYNC_RUNNER_THREAD_NAME));
+
+        client.close();
+
+        assertTrue(channels[0].isClosed());
+        assertTrue(channels[1].isClosed());
+        assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(ASYNC_RUNNER_THREAD_NAME) == 0, 1_000L));
+    }
+
+    /**
+     * Gets threads count with a given name.
+     */
+    private static int threadsCount(String name) {
+        int cnt = 0;
+
+        long[] threadIds = U.getThreadMx().getAllThreadIds();
+
+        for (long id : threadIds) {
+            ThreadInfo info = U.getThreadMx().getThreadInfo(id);
+
+            if (info != null && info.getThreadState() != Thread.State.TERMINATED && name.equals(info.getThreadName()))
+                cnt++;
+        }
+
+        return cnt;
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 0c6f92e..f0ac9a8 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.client;
 
+import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest;
 import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest;
 import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessUnstableTopologyTest;
 import org.junit.runner.RunWith;
@@ -41,7 +42,8 @@ import org.junit.runners.Suite;
     ConnectToStartingNodeTest.class,
     AsyncChannelTest.class,
     ThinClientPartitionAwarenessStableTopologyTest.class,
-    ThinClientPartitionAwarenessUnstableTopologyTest.class
+    ThinClientPartitionAwarenessUnstableTopologyTest.class,
+    ThinClientPartitionAwarenessResourceReleaseTest.class
 })
 public class ClientTestSuite {
     // No-op.