You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/01/17 14:17:38 UTC

[ignite-3] branch main updated: IGNITE-15359 Add IgniteClientConfiguration.asyncContinuationExecutor (#558)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 96e4e7f  IGNITE-15359 Add IgniteClientConfiguration.asyncContinuationExecutor (#558)
96e4e7f is described below

commit 96e4e7f7478ca63a7e6993df5fbcdd010457c827
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Jan 17 17:17:32 2022 +0300

    IGNITE-15359 Add IgniteClientConfiguration.asyncContinuationExecutor (#558)
---
 modules/client/pom.xml                             |  6 +++
 .../org/apache/ignite/client/IgniteClient.java     | 31 ++++++++++++++-
 .../ignite/client/IgniteClientConfiguration.java   | 20 ++++++++++
 .../client/IgniteClientConfigurationImpl.java      | 28 +++++++++++---
 .../ignite/internal/client/TcpClientChannel.java   |  4 +-
 .../apache/ignite/client/AbstractClientTest.java   |  4 +-
 .../apache/ignite/client/ConfigurationTest.java    | 45 ++++++++++++++++++++++
 7 files changed, 127 insertions(+), 11 deletions(-)

diff --git a/modules/client/pom.xml b/modules/client/pom.xml
index e43eb60..fd791ef 100644
--- a/modules/client/pom.xml
+++ b/modules/client/pom.xml
@@ -81,6 +81,12 @@
 
         <!-- Test dependencies -->
         <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-api</artifactId>
             <scope>test</scope>
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
index 034976b..8096696 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -25,6 +25,9 @@ import static org.apache.ignite.internal.client.ClientUtils.sync;
 
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.client.IgniteClientConfigurationImpl;
 import org.apache.ignite.internal.client.TcpIgniteClient;
@@ -69,6 +72,9 @@ public interface IgniteClient extends Ignite {
         /** Reconnect throttling retries. */
         private int reconnectThrottlingRetries = DFLT_RECONNECT_THROTTLING_RETRIES;
 
+        /** Async continuation executor. */
+        private Executor asyncContinuationExecutor;
+
         /**
          * Sets the addresses of Ignite server nodes within a cluster. An address can be an IP address or a hostname, with or without port.
          * If port is not set then Ignite will generate multiple addresses for default port range. See {@link
@@ -80,7 +86,7 @@ public interface IgniteClient extends Ignite {
         public Builder addresses(String... addrs) {
             Objects.requireNonNull(addrs, "addrs is null");
 
-            addresses = addrs;
+            addresses = addrs.clone();
 
             return this;
         }
@@ -167,6 +173,26 @@ public interface IgniteClient extends Ignite {
         }
 
         /**
+         * Sets the async continuation executor.
+         *
+         * <p>When <code>null</code> (default), {@link ForkJoinPool#commonPool()} is used.
+         *
+         * <p>When async client operation completes, corresponding {@link java.util.concurrent.CompletableFuture} continuations
+         * (such as {@link java.util.concurrent.CompletableFuture#thenApply(Function)}) will be invoked using this executor.
+         *
+         * <p>Server responses are handled by a dedicated network thread. To ensure optimal performance,
+         * this thread should not perform any extra work, so user-defined continuations are offloaded to the specified executor.
+         *
+         * @param asyncContinuationExecutor Async continuation executor.
+         * @return This instance.
+         */
+        public Builder asyncContinuationExecutor(Executor asyncContinuationExecutor) {
+            this.asyncContinuationExecutor = asyncContinuationExecutor;
+
+            return this;
+        }
+
+        /**
          * Builds the client.
          *
          * @return Ignite client.
@@ -187,7 +213,8 @@ public interface IgniteClient extends Ignite {
                     retryLimit,
                     connectTimeout,
                     reconnectThrottlingPeriod,
-                    reconnectThrottlingRetries);
+                    reconnectThrottlingRetries,
+                    asyncContinuationExecutor);
 
             return TcpIgniteClient.startAsync(cfg);
         }
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
index 3d32c53..dfde409 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -17,6 +17,11 @@
 
 package org.apache.ignite.client;
 
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Ignite client configuration.
  */
@@ -82,4 +87,19 @@ public interface IgniteClientConfiguration {
      * @return Reconnect throttling retries.
      */
     int reconnectThrottlingRetries();
+
+    /**
+     * Gets the async continuation executor.
+     *
+     * <p>When <code>null</code> (default), {@link ForkJoinPool#commonPool()} is used.
+     *
+     * <p>When async client operation completes, corresponding {@link java.util.concurrent.CompletableFuture} continuations
+     * (such as {@link java.util.concurrent.CompletableFuture#thenApply(Function)}) will be invoked using this executor.
+     *
+     * <p>Server responses are handled by a dedicated network thread. To ensure optimal performance,
+     * this thread should not perform any extra work, so user-defined continuations are offloaded to the specified executor.
+     *
+     * @return Executor for async continuations.
+     */
+    @Nullable Executor asyncContinuationExecutor();
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
index aaf548d..f084dfc 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.client;
 
+import java.util.concurrent.Executor;
 import org.apache.ignite.client.IgniteClientAddressFinder;
 import org.apache.ignite.client.IgniteClientConfiguration;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Immutable client configuration.
@@ -42,13 +44,17 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
     /** Reconnect throttling retries. */
     private final int reconnectThrottlingRetries;
 
+    /** Async continuation executor. */
+    private final Executor asyncContinuationExecutor;
+
     /**
      * Constructor.
      *
-     * @param addressFinder  Address finder.
-     * @param addresses      Addresses.
-     * @param retryLimit     Retry limit.
-     * @param connectTimeout Socket connect timeout.
+     * @param addressFinder             Address finder.
+     * @param addresses                 Addresses.
+     * @param retryLimit                Retry limit.
+     * @param connectTimeout            Socket connect timeout.
+     * @param asyncContinuationExecutor Async continuation executor.
      */
     public IgniteClientConfigurationImpl(
             IgniteClientAddressFinder addressFinder,
@@ -56,14 +62,18 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
             int retryLimit,
             long connectTimeout,
             long reconnectThrottlingPeriod,
-            int reconnectThrottlingRetries
-    ) {
+            int reconnectThrottlingRetries,
+            Executor asyncContinuationExecutor) {
         this.addressFinder = addressFinder;
+
+        //noinspection AssignmentOrReturnOfFieldWithMutableType (cloned in Builder).
         this.addresses = addresses;
+
         this.retryLimit = retryLimit;
         this.connectTimeout = connectTimeout;
         this.reconnectThrottlingPeriod = reconnectThrottlingPeriod;
         this.reconnectThrottlingRetries = reconnectThrottlingRetries;
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
     }
 
     /** {@inheritDoc} */
@@ -101,4 +111,10 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
     public int reconnectThrottlingRetries() {
         return reconnectThrottlingRetries;
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable Executor asyncContinuationExecutor() {
+        return asyncContinuationExecutor;
+    }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index a0bd791..81041f3 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -90,7 +90,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     TcpClientChannel(ClientChannelConfiguration cfg, ClientConnectionMultiplexer connMgr) {
         validateConfiguration(cfg);
 
-        asyncContinuationExecutor = ForkJoinPool.commonPool();
+        asyncContinuationExecutor = cfg.clientConfiguration().asyncContinuationExecutor() == null
+                ? ForkJoinPool.commonPool()
+                : cfg.clientConfiguration().asyncContinuationExecutor();
 
         connectTimeout = cfg.clientConfiguration().connectTimeout();
 
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 34455c6..673516c 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -41,7 +41,7 @@ public abstract class AbstractClientTest {
 
     protected static Ignite server;
 
-    protected static Ignite client;
+    protected static IgniteClient client;
 
     protected static int serverPort;
 
@@ -86,7 +86,7 @@ public abstract class AbstractClientTest {
      * @param addrs Addresses.
      * @return Client.
      */
-    public static Ignite startClient(String... addrs) {
+    public static IgniteClient startClient(String... addrs) {
         if (addrs == null || addrs.length == 0) {
             addrs = new String[]{"127.0.0.1:" + serverPort};
         }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
index 6dcc305..3a788a9 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
@@ -17,10 +17,16 @@
 
 package org.apache.ignite.client;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.startsWith;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.lang.IgniteException;
 import org.junit.jupiter.api.Test;
 
@@ -124,4 +130,43 @@ public class ConfigurationTest extends AbstractClientTest {
 
         assertThrows(IllegalArgumentException.class, builder::build);
     }
+
+    @Test
+    public void testDefaultAsyncContinuationExecutorIsForkJoinPool() {
+        String threadName = client.tables().tablesAsync().thenApply(unused -> Thread.currentThread().getName()).join();
+
+        assertNull(client.configuration().asyncContinuationExecutor());
+        assertThat(threadName, startsWith("ForkJoinPool.commonPool-worker-"));
+    }
+
+    @Test
+    public void testDirectAsyncContinuationExecutorUsesNettyThread() throws Exception {
+        IgniteClient.Builder builder = IgniteClient.builder()
+                .addresses("127.0.0.1:" + serverPort)
+                .asyncContinuationExecutor(Runnable::run);
+
+        try (Ignite ignite = builder.build()) {
+            String threadName = ignite.tables().tablesAsync().thenApply(unused -> Thread.currentThread().getName()).join();
+
+            assertThat(threadName, startsWith("nioEventLoopGroup-"));
+        }
+    }
+
+    @Test
+    public void testCustomAsyncContinuationExecutor() throws Exception {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        IgniteClient.Builder builder = IgniteClient.builder()
+                .addresses("127.0.0.1:" + serverPort)
+                .asyncContinuationExecutor(executor);
+
+        try (IgniteClient ignite = builder.build()) {
+            String threadName = ignite.tables().tablesAsync().thenApply(unused -> Thread.currentThread().getName()).join();
+
+            assertEquals(executor, ignite.configuration().asyncContinuationExecutor());
+            assertThat(threadName, startsWith("pool-"));
+        }
+
+        executor.shutdown();
+    }
 }