You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/01/17 14:17:38 UTC
[ignite-3] branch main updated: IGNITE-15359 Add IgniteClientConfiguration.asyncContinuationExecutor (#558)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 96e4e7f IGNITE-15359 Add IgniteClientConfiguration.asyncContinuationExecutor (#558)
96e4e7f is described below
commit 96e4e7f7478ca63a7e6993df5fbcdd010457c827
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Jan 17 17:17:32 2022 +0300
IGNITE-15359 Add IgniteClientConfiguration.asyncContinuationExecutor (#558)
---
modules/client/pom.xml | 6 +++
.../org/apache/ignite/client/IgniteClient.java | 31 ++++++++++++++-
.../ignite/client/IgniteClientConfiguration.java | 20 ++++++++++
.../client/IgniteClientConfigurationImpl.java | 28 +++++++++++---
.../ignite/internal/client/TcpClientChannel.java | 4 +-
.../apache/ignite/client/AbstractClientTest.java | 4 +-
.../apache/ignite/client/ConfigurationTest.java | 45 ++++++++++++++++++++++
7 files changed, 127 insertions(+), 11 deletions(-)
diff --git a/modules/client/pom.xml b/modules/client/pom.xml
index e43eb60..fd791ef 100644
--- a/modules/client/pom.xml
+++ b/modules/client/pom.xml
@@ -81,6 +81,12 @@
<!-- Test dependencies -->
<dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
index 034976b..8096696 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -25,6 +25,9 @@ import static org.apache.ignite.internal.client.ClientUtils.sync;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.client.IgniteClientConfigurationImpl;
import org.apache.ignite.internal.client.TcpIgniteClient;
@@ -69,6 +72,9 @@ public interface IgniteClient extends Ignite {
/** Reconnect throttling retries. */
private int reconnectThrottlingRetries = DFLT_RECONNECT_THROTTLING_RETRIES;
+ /** Async continuation executor. */
+ private Executor asyncContinuationExecutor;
+
/**
* Sets the addresses of Ignite server nodes within a cluster. An address can be an IP address or a hostname, with or without port.
* If port is not set then Ignite will generate multiple addresses for default port range. See {@link
@@ -80,7 +86,7 @@ public interface IgniteClient extends Ignite {
public Builder addresses(String... addrs) {
Objects.requireNonNull(addrs, "addrs is null");
- addresses = addrs;
+ addresses = addrs.clone();
return this;
}
@@ -167,6 +173,26 @@ public interface IgniteClient extends Ignite {
}
/**
+ * Sets the async continuation executor.
+ *
+ * <p>When <code>null</code> (default), {@link ForkJoinPool#commonPool()} is used.
+ *
+ * <p>When async client operation completes, corresponding {@link java.util.concurrent.CompletableFuture} continuations
+ * (such as {@link java.util.concurrent.CompletableFuture#thenApply(Function)}) will be invoked using this executor.
+ *
+ * <p>Server responses are handled by a dedicated network thread. To ensure optimal performance,
+ * this thread should not perform any extra work, so user-defined continuations are offloaded to the specified executor.
+ *
+ * @param asyncContinuationExecutor Async continuation executor.
+ * @return This instance.
+ */
+ public Builder asyncContinuationExecutor(Executor asyncContinuationExecutor) {
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
+
+ return this;
+ }
+
+ /**
* Builds the client.
*
* @return Ignite client.
@@ -187,7 +213,8 @@ public interface IgniteClient extends Ignite {
retryLimit,
connectTimeout,
reconnectThrottlingPeriod,
- reconnectThrottlingRetries);
+ reconnectThrottlingRetries,
+ asyncContinuationExecutor);
return TcpIgniteClient.startAsync(cfg);
}
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
index 3d32c53..dfde409 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -17,6 +17,11 @@
package org.apache.ignite.client;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
+import org.jetbrains.annotations.Nullable;
+
/**
* Ignite client configuration.
*/
@@ -82,4 +87,19 @@ public interface IgniteClientConfiguration {
* @return Reconnect throttling retries.
*/
int reconnectThrottlingRetries();
+
+ /**
+ * Gets the async continuation executor.
+ *
+ * <p>When <code>null</code> (default), {@link ForkJoinPool#commonPool()} is used.
+ *
+ * <p>When async client operation completes, corresponding {@link java.util.concurrent.CompletableFuture} continuations
+ * (such as {@link java.util.concurrent.CompletableFuture#thenApply(Function)}) will be invoked using this executor.
+ *
+ * <p>Server responses are handled by a dedicated network thread. To ensure optimal performance,
+ * this thread should not perform any extra work, so user-defined continuations are offloaded to the specified executor.
+ *
+ * @return Executor for async continuations.
+ */
+ @Nullable Executor asyncContinuationExecutor();
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
index aaf548d..f084dfc 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.client;
+import java.util.concurrent.Executor;
import org.apache.ignite.client.IgniteClientAddressFinder;
import org.apache.ignite.client.IgniteClientConfiguration;
+import org.jetbrains.annotations.Nullable;
/**
* Immutable client configuration.
@@ -42,13 +44,17 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
/** Reconnect throttling retries. */
private final int reconnectThrottlingRetries;
+ /** Async continuation executor. */
+ private final Executor asyncContinuationExecutor;
+
/**
* Constructor.
*
- * @param addressFinder Address finder.
- * @param addresses Addresses.
- * @param retryLimit Retry limit.
- * @param connectTimeout Socket connect timeout.
+ * @param addressFinder Address finder.
+ * @param addresses Addresses.
+ * @param retryLimit Retry limit.
+ * @param connectTimeout Socket connect timeout.
+ * @param asyncContinuationExecutor Async continuation executor.
*/
public IgniteClientConfigurationImpl(
IgniteClientAddressFinder addressFinder,
@@ -56,14 +62,18 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
int retryLimit,
long connectTimeout,
long reconnectThrottlingPeriod,
- int reconnectThrottlingRetries
- ) {
+ int reconnectThrottlingRetries,
+ Executor asyncContinuationExecutor) {
this.addressFinder = addressFinder;
+
+ //noinspection AssignmentOrReturnOfFieldWithMutableType (cloned in Builder).
this.addresses = addresses;
+
this.retryLimit = retryLimit;
this.connectTimeout = connectTimeout;
this.reconnectThrottlingPeriod = reconnectThrottlingPeriod;
this.reconnectThrottlingRetries = reconnectThrottlingRetries;
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
}
/** {@inheritDoc} */
@@ -101,4 +111,10 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
public int reconnectThrottlingRetries() {
return reconnectThrottlingRetries;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable Executor asyncContinuationExecutor() {
+ return asyncContinuationExecutor;
+ }
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index a0bd791..81041f3 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -90,7 +90,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
TcpClientChannel(ClientChannelConfiguration cfg, ClientConnectionMultiplexer connMgr) {
validateConfiguration(cfg);
- asyncContinuationExecutor = ForkJoinPool.commonPool();
+ asyncContinuationExecutor = cfg.clientConfiguration().asyncContinuationExecutor() == null
+ ? ForkJoinPool.commonPool()
+ : cfg.clientConfiguration().asyncContinuationExecutor();
connectTimeout = cfg.clientConfiguration().connectTimeout();
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 34455c6..673516c 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -41,7 +41,7 @@ public abstract class AbstractClientTest {
protected static Ignite server;
- protected static Ignite client;
+ protected static IgniteClient client;
protected static int serverPort;
@@ -86,7 +86,7 @@ public abstract class AbstractClientTest {
* @param addrs Addresses.
* @return Client.
*/
- public static Ignite startClient(String... addrs) {
+ public static IgniteClient startClient(String... addrs) {
if (addrs == null || addrs.length == 0) {
addrs = new String[]{"127.0.0.1:" + serverPort};
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
index 6dcc305..3a788a9 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
@@ -17,10 +17,16 @@
package org.apache.ignite.client;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.ignite.Ignite;
import org.apache.ignite.lang.IgniteException;
import org.junit.jupiter.api.Test;
@@ -124,4 +130,43 @@ public class ConfigurationTest extends AbstractClientTest {
assertThrows(IllegalArgumentException.class, builder::build);
}
+
+ @Test
+ public void testDefaultAsyncContinuationExecutorIsForkJoinPool() {
+ String threadName = client.tables().tablesAsync().thenApply(unused -> Thread.currentThread().getName()).join();
+
+ assertNull(client.configuration().asyncContinuationExecutor());
+ assertThat(threadName, startsWith("ForkJoinPool.commonPool-worker-"));
+ }
+
+ @Test
+ public void testDirectAsyncContinuationExecutorUsesNettyThread() throws Exception {
+ IgniteClient.Builder builder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + serverPort)
+ .asyncContinuationExecutor(Runnable::run);
+
+ try (Ignite ignite = builder.build()) {
+ String threadName = ignite.tables().tablesAsync().thenApply(unused -> Thread.currentThread().getName()).join();
+
+ assertThat(threadName, startsWith("nioEventLoopGroup-"));
+ }
+ }
+
+ @Test
+ public void testCustomAsyncContinuationExecutor() throws Exception {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ IgniteClient.Builder builder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + serverPort)
+ .asyncContinuationExecutor(executor);
+
+ try (IgniteClient ignite = builder.build()) {
+ String threadName = ignite.tables().tablesAsync().thenApply(unused -> Thread.currentThread().getName()).join();
+
+ assertEquals(executor, ignite.configuration().asyncContinuationExecutor());
+ assertThat(threadName, startsWith("pool-"));
+ }
+
+ executor.shutdown();
+ }
}