You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2023/01/26 11:36:13 UTC
[ignite] branch master updated: IGNITE-18637 Java thin client: Connections balancing - Fixes #10497.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8b825687e66 IGNITE-18637 Java thin client: Connections balancing - Fixes #10497.
8b825687e66 is described below
commit 8b825687e66f42999d7f27401b7df3ff5c733c14
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Thu Jan 26 14:33:31 2023 +0300
IGNITE-18637 Java thin client: Connections balancing - Fixes #10497.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../internal/client/thin/ReliableChannel.java | 49 +++++-
.../internal/client/thin/TcpClientCache.java | 191 ++++++++++++++++-----
.../client/thin/TcpClientTransactions.java | 9 +-
.../client/thin}/FunctionalTest.java | 95 ++++++----
.../internal/client/thin/ReliableChannelTest.java | 36 ++--
.../ThinClientAbstractPartitionAwarenessTest.java | 30 ++--
.../ThinClientPartitionAwarenessBalancingTest.java | 52 ++++++
.../ThinClientPartitionAwarenessDiscoveryTest.java | 17 +-
...lientPartitionAwarenessResourceReleaseTest.java | 2 +-
...ClientPartitionAwarenessStableTopologyTest.java | 26 ++-
...ientPartitionAwarenessUnstableTopologyTest.java | 22 ++-
.../org/apache/ignite/client/ClientTestSuite.java | 3 +
12 files changed, 379 insertions(+), 153 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index d7e0b63d550..6207c7fcd75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -30,7 +31,9 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
@@ -117,6 +120,9 @@ final class ReliableChannel implements AutoCloseable {
/** Cache addresses returned by {@code ThinClientAddressFinder}. */
private volatile String[] prevHostAddrs;
+ /** Open channels counter. */
+ private final AtomicInteger channelsCnt = new AtomicInteger();
+
/**
* Constructor.
*/
@@ -721,8 +727,18 @@ final class ReliableChannel implements AutoCloseable {
dfltChannelIdx = reinitHolders.size() - 1;
}
- if (dfltChannelIdx == -1)
- dfltChannelIdx = 0;
+ if (dfltChannelIdx == -1) {
+ // If holder is not specified get the random holder from the range of holders with the same port.
+ reinitHolders.sort(Comparator.comparingInt(h -> h.getAddress().getPort()));
+
+ int limit = 0;
+ int port = reinitHolders.get(0).getAddress().getPort();
+
+ while (limit + 1 < reinitHolders.size() && reinitHolders.get(limit + 1).getAddress().getPort() == port)
+ limit++;
+
+ dfltChannelIdx = ThreadLocalRandom.current().nextInt(limit + 1);
+ }
curChannelsGuard.writeLock().lock();
@@ -803,7 +819,21 @@ final class ReliableChannel implements AutoCloseable {
curChannelsGuard.readLock().lock();
try {
- hld = channels.get(curChIdx);
+ if (!partitionAwarenessEnabled || channelsCnt.get() <= 1 || attempt != 0)
+ hld = channels.get(curChIdx);
+ else {
+ // Make first attempt with the random open channel.
+ int idx = ThreadLocalRandom.current().nextInt(channels.size());
+ int idx0 = idx;
+
+ do {
+ hld = channels.get(idx);
+
+ if (++idx == channels.size())
+ idx = 0;
+ }
+ while (hld.ch == null && idx != idx0);
+ }
}
finally {
curChannelsGuard.readLock().unlock();
@@ -1010,6 +1040,8 @@ final class ReliableChannel implements AutoCloseable {
}
ch = channel;
+
+ channelsCnt.incrementAndGet();
}
}
@@ -1024,6 +1056,8 @@ final class ReliableChannel implements AutoCloseable {
U.closeQuiet(ch);
ch = null;
+
+ channelsCnt.decrementAndGet();
}
}
@@ -1055,7 +1089,7 @@ final class ReliableChannel implements AutoCloseable {
}
/**
- * Get holders reference. For test purposes.ClientOperation
+ * Get holders reference. For test purposes.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") // For tests.
List<ClientChannelHolder> getChannelHolders() {
@@ -1070,6 +1104,13 @@ final class ReliableChannel implements AutoCloseable {
return nodeChannels;
}
+ /**
+ * Get index of current (default) channel holder. For test purposes.
+ */
+ int getCurrentChannelIndex() {
+ return curChIdx;
+ }
+
/**
* Get scheduledChannelsReinit reference. For test purposes.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index fee4d55edb6..62dd9f1d83e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.cache.Cache;
@@ -42,6 +43,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientDisconnectListener;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
@@ -228,9 +230,11 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
if (keys.isEmpty())
return true;
- return ch.service(
+ TcpClientTransaction tx = transactions.tx();
+
+ return txAwareService(null, tx,
ClientOperation.CACHE_CONTAINS_KEYS,
- req -> writeKeys(keys, req),
+ req -> writeKeys(keys, req, tx),
res -> res.in().readBoolean());
}
@@ -242,9 +246,11 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
if (keys.isEmpty())
return IgniteClientFutureImpl.completedFuture(true);
- return ch.serviceAsync(
+ TcpClientTransaction tx = transactions.tx();
+
+ return txAwareServiceAsync(null, tx,
ClientOperation.CACHE_CONTAINS_KEYS,
- req -> writeKeys(keys, req),
+ req -> writeKeys(keys, req, tx),
res -> res.in().readBoolean());
}
@@ -303,7 +309,12 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
if (keys.isEmpty())
return new HashMap<>();
- return ch.service(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries);
+ TcpClientTransaction tx = transactions.tx();
+
+ return txAwareService(null, tx,
+ ClientOperation.CACHE_GET_ALL,
+ req -> writeKeys(keys, req, tx),
+ this::readEntries);
}
/** {@inheritDoc} */
@@ -314,7 +325,13 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
if (keys.isEmpty())
return IgniteClientFutureImpl.completedFuture(new HashMap<>());
- return ch.serviceAsync(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries);
+ TcpClientTransaction tx = transactions.tx();
+
+ return txAwareServiceAsync(null, tx,
+ ClientOperation.CACHE_GET_ALL,
+ req -> writeKeys(keys, req, tx),
+ this::readEntries);
+
}
/** {@inheritDoc} */
@@ -325,12 +342,28 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
if (map.isEmpty())
return;
- ch.request(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req));
+ TcpClientTransaction tx = transactions.tx();
+
+ txAwareService(null, tx,
+ ClientOperation.CACHE_PUT_ALL,
+ req -> writeEntries(map, req, tx),
+ null);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) throws ClientException {
- return ch.requestAsync(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req));
+ if (map == null)
+ throw new NullPointerException("map");
+
+ if (map.isEmpty())
+ return IgniteClientFutureImpl.completedFuture(null);
+
+ TcpClientTransaction tx = transactions.tx();
+
+ return txAwareServiceAsync(null, tx,
+ ClientOperation.CACHE_PUT_ALL,
+ req -> writeEntries(map, req, tx),
+ null);
}
/** {@inheritDoc} */
@@ -475,11 +508,14 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
if (keys.isEmpty())
return;
- ch.request(
+ TcpClientTransaction tx = transactions.tx();
+
+ txAwareService(null, tx,
ClientOperation.CACHE_REMOVE_KEYS,
req -> {
- writeKeys(keys, req);
- }
+ writeKeys(keys, req, tx);
+ },
+ null
);
}
@@ -491,11 +527,14 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
if (keys.isEmpty())
return IgniteClientFutureImpl.completedFuture(null);
- return ch.requestAsync(
- ClientOperation.CACHE_REMOVE_KEYS,
- req -> {
- writeKeys(keys, req);
- }
+ TcpClientTransaction tx = transactions.tx();
+
+ return txAwareServiceAsync(null, tx,
+ ClientOperation.CACHE_REMOVE_KEYS,
+ req -> {
+ writeKeys(keys, req, tx);
+ },
+ null
);
}
@@ -707,9 +746,12 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
if (keys.isEmpty())
return;
- ch.request(
+ TcpClientTransaction tx = transactions.tx();
+
+ txAwareService(null, tx,
ClientOperation.CACHE_CLEAR_KEYS,
- req -> writeKeys(keys, req)
+ req -> writeKeys(keys, req, tx),
+ null
);
}
@@ -721,9 +763,12 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
if (keys.isEmpty())
return IgniteClientFutureImpl.completedFuture(null);
- return ch.requestAsync(
+ TcpClientTransaction tx = transactions.tx();
+
+ return txAwareServiceAsync(null, tx,
ClientOperation.CACHE_CLEAR_KEYS,
- req -> writeKeys(keys, req)
+ req -> writeKeys(keys, req, tx),
+ null
);
}
@@ -1062,6 +1107,68 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
));
}
+ /**
+ * Execute operation on channel most suitable for transactional context.
+ */
+ private <T> T txAwareService(
+ @Nullable K affKey,
+ TcpClientTransaction tx,
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
+ ) {
+ // Transactional operation cannot be executed on affinity node, it should be executed on node started
+ // the transaction.
+ if (tx != null) {
+ try {
+ return tx.clientChannel().service(op, payloadWriter, payloadReader);
+ }
+ catch (ClientConnectionException e) {
+ throw new ClientException("Transaction context has been lost due to connection errors. " +
+ "Cache operations are prohibited until current transaction closed.", e);
+ }
+ }
+ else if (affKey != null)
+ return ch.affinityService(cacheId, affKey, op, payloadWriter, payloadReader);
+ else
+ return ch.service(op, payloadWriter, payloadReader);
+ }
+
+ /**
+ * Execute operation on channel most suitable for transactional context.
+ */
+ private <T> IgniteClientFuture<T> txAwareServiceAsync(
+ @Nullable K affKey,
+ TcpClientTransaction tx,
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader
+ ) {
+ // Transactional operation cannot be executed on affinity node, it should be executed on node started
+ // the transaction.
+ if (tx != null) {
+ CompletableFuture<T> fut = new CompletableFuture<>();
+
+ tx.clientChannel().serviceAsync(op, payloadWriter, payloadReader).whenComplete((res, err) -> {
+ if (err instanceof ClientConnectionException) {
+ fut.completeExceptionally(
+ new ClientException("Transaction context has been lost due to connection errors. " +
+ "Cache operations are prohibited until current transaction closed.", err));
+ }
+ else if (err != null)
+ fut.completeExceptionally(err);
+ else
+ fut.complete(res);
+ });
+
+ return new IgniteClientFutureImpl<>(fut);
+ }
+ else if (affKey != null)
+ return ch.affinityServiceAsync(cacheId, affKey, op, payloadWriter, payloadReader);
+ else
+ return ch.serviceAsync(op, payloadWriter, payloadReader);
+ }
+
/**
* Execute cache operation with a single key.
*/
@@ -1071,18 +1178,17 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
Consumer<PayloadOutputChannel> additionalPayloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException {
+ TcpClientTransaction tx = transactions.tx();
+
Consumer<PayloadOutputChannel> payloadWriter = req -> {
- writeCacheInfo(req);
+ writeCacheInfo(req, tx);
writeObject(req, key);
if (additionalPayloadWriter != null)
additionalPayloadWriter.accept(req);
};
- // Transactional operation cannot be executed on affinity node, it should be executed on node started
- // the transaction.
- return transactions.tx() == null ? ch.affinityService(cacheId, key, op, payloadWriter, payloadReader) :
- ch.service(op, payloadWriter, payloadReader);
+ return txAwareService(key, tx, op, payloadWriter, payloadReader);
}
/**
@@ -1094,31 +1200,32 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
Consumer<PayloadOutputChannel> additionalPayloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException {
+ TcpClientTransaction tx = transactions.tx();
+
Consumer<PayloadOutputChannel> payloadWriter = req -> {
- writeCacheInfo(req);
+ writeCacheInfo(req, tx);
writeObject(req, key);
if (additionalPayloadWriter != null)
additionalPayloadWriter.accept(req);
};
- // Transactional operation cannot be executed on affinity node, it should be executed on node started
- // the transaction.
- return transactions.tx() == null
- ? ch.affinityServiceAsync(cacheId, key, op, payloadWriter, payloadReader)
- : ch.serviceAsync(op, payloadWriter, payloadReader);
+ return txAwareServiceAsync(key, tx, op, payloadWriter, payloadReader);
}
- /** Write cache ID and flags. */
+ /** Write cache ID and flags for non-transactional operations. */
private void writeCacheInfo(PayloadOutputChannel payloadCh) {
+ writeCacheInfo(payloadCh, null);
+ }
+
+ /** Write cache ID and flags. */
+ private void writeCacheInfo(PayloadOutputChannel payloadCh, TcpClientTransaction tx) {
BinaryOutputStream out = payloadCh.out();
out.writeInt(cacheId);
byte flags = keepBinary ? KEEP_BINARY_FLAG_MASK : 0;
- TcpClientTransaction tx = transactions.tx();
-
if (expiryPlc != null) {
ProtocolContext protocolCtx = payloadCh.clientChannel().protocolCtx();
@@ -1130,14 +1237,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
flags |= WITH_EXPIRY_POLICY_FLAG_MASK;
}
- if (tx != null) {
- if (tx.clientChannel() != payloadCh.clientChannel()) {
- throw new ClientException("Transaction context has been lost due to connection errors. " +
- "Cache operations are prohibited until current transaction closed.");
- }
-
+ if (tx != null)
flags |= TRANSACTIONAL_FLAG_MASK;
- }
out.writeByte(flags);
@@ -1177,8 +1278,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
}
/** */
- private void writeKeys(Set<? extends K> keys, PayloadOutputChannel req) {
- writeCacheInfo(req);
+ private void writeKeys(Set<? extends K> keys, PayloadOutputChannel req, TcpClientTransaction tx) {
+ writeCacheInfo(req, tx);
ClientUtils.collection(keys, req.out(), serDes::writeObject);
}
@@ -1196,8 +1297,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
}
/** */
- private void writeEntries(Map<? extends K, ? extends V> map, PayloadOutputChannel req) {
- writeCacheInfo(req);
+ private void writeEntries(Map<? extends K, ? extends V> map, PayloadOutputChannel req, TcpClientTransaction tx) {
+ writeCacheInfo(req, tx);
ClientUtils.collection(
map.entrySet(),
req.out(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
index 3552968c370..1b72dab3674 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.thin;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientTransaction;
import org.apache.ignite.client.ClientTransactions;
@@ -238,15 +239,15 @@ class TcpClientTransactions implements ClientTransactions {
*/
private void endTx(boolean committed) {
try {
- ch.service(ClientOperation.TX_END,
+ clientCh.service(ClientOperation.TX_END,
req -> {
- if (clientCh != req.clientChannel())
- throw new ClientException("Transaction context has been lost due to connection errors");
-
req.out().writeInt(txId);
req.out().writeBoolean(committed);
}, null);
}
+ catch (ClientConnectionException e) {
+ throw new ClientException("Transaction context has been lost due to connection errors", e);
+ }
finally {
txMap.remove(txUid);
diff --git a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java
similarity index 96%
rename from modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
rename to modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java
index 792a9980e30..c278fc5f3d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.client;
+package org.apache.ignite.internal.client.thin;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
@@ -57,6 +57,16 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientTransaction;
+import org.apache.ignite.client.Comparers;
+import org.apache.ignite.client.Config;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.LocalIgniteCluster;
+import org.apache.ignite.client.Person;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
@@ -66,15 +76,12 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.AbstractBinaryArraysTest;
-import org.apache.ignite.internal.client.thin.ClientServerError;
import org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum;
-import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.mxbean.ClientProcessorMXBean;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.spi.systemview.view.TransactionView;
import org.apache.ignite.testframework.GridTestUtils;
@@ -828,8 +835,11 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
*/
@Test
public void testTransactions() throws Exception {
- try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
- IgniteClient client = Ignition.startClient(getClientConfiguration())
+ int clusterSize = 3;
+
+ try (LocalIgniteCluster cluster = LocalIgniteCluster.start(clusterSize);
+ IgniteClient client = Ignition.startClient(getClientConfiguration()
+ .setAddresses(cluster.clientAddresses().toArray(new String[clusterSize])))
) {
ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
.setName("cache")
@@ -937,13 +947,10 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
cache.put(1, "value5");
// Test failover.
- ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
- ClientListenerProcessor.class, ClientProcessorMXBean.class);
-
try (ClientTransaction tx = client.transactions().txStart()) {
cache.put(1, "value6");
- mxBean.dropAllConnections();
+ ((TcpClientTransactions.TcpClientTransaction)tx).clientChannel().close();
try {
cache.put(1, "value7");
@@ -1157,9 +1164,35 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
assertEquals("value23", cache.get(0));
}
+ // Test that new transaction can be started after commit of the previous one without closing.
+ ClientTransaction tx = client.transactions().txStart();
+ tx.commit();
+
+ tx = client.transactions().txStart();
+ tx.rollback();
+
+ // Test that new transaction can be started after rollback of the previous one without closing.
+ tx = client.transactions().txStart();
+ tx.commit();
+
+ // Test that implicit transaction started after commit of previous one without closing.
+ cache.put(0, "value24");
+
+ GridTestUtils.runAsync(() -> assertEquals("value24", cache.get(0))).get();
+ }
+ }
+
+ /**
+ * Test transactions.
+ */
+ @Test
+ public void testTransactionsLimit() throws Exception {
+ try (IgniteEx ignite = (IgniteEx)Ignition.start(Config.getServerConfiguration());
+ IgniteClient client = Ignition.startClient(getClientConfiguration())
+ ) {
// Test active transactions limit.
- int txLimit = ignite.configuration().getClientConnectorConfiguration().getThinClientConfiguration()
- .getMaxActiveTxPerConnection();
+ int txLimit = ignite.configuration().getClientConnectorConfiguration()
+ .getThinClientConfiguration().getMaxActiveTxPerConnection();
List<ClientTransaction> txs = new ArrayList<>(txLimit);
@@ -1181,22 +1214,6 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
for (ClientTransaction tx : txs)
tx.close();
-
- // Test that new transaction can be started after commit of the previous one without closing.
- ClientTransaction tx = client.transactions().txStart();
- tx.commit();
-
- tx = client.transactions().txStart();
- tx.rollback();
-
- // Test that new transaction can be started after rollback of the previous one without closing.
- tx = client.transactions().txStart();
- tx.commit();
-
- // Test that implicit transaction started after commit of previous one without closing.
- cache.put(0, "value24");
-
- GridTestUtils.runAsync(() -> assertEquals("value24", cache.get(0))).get();
}
}
@@ -1205,8 +1222,11 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
*/
@Test
public void testTransactionsAsync() throws Exception {
- try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
- IgniteClient client = Ignition.startClient(getClientConfiguration())
+ int clusterSize = 3;
+
+ try (LocalIgniteCluster cluster = LocalIgniteCluster.start(clusterSize);
+ IgniteClient client = Ignition.startClient(getClientConfiguration()
+ .setAddresses(cluster.clientAddresses().toArray(new String[clusterSize])))
) {
ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
.setName("cache")
@@ -1240,6 +1260,21 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
}
assertEquals("value2", cache.get(1));
+
+ // Test multi-key operations.
+ try (ClientTransaction tx = client.transactions().txStart()) {
+ cache.putAllAsync(F.asMap(1, "value3", 2, "value3")).get();
+
+ assertEquals(F.asMap(1, "value3", 2, "value3"),
+ cache.getAllAsync(new HashSet<>(F.asList(1, 2))).get());
+
+ cache.removeAllAsync(new HashSet<>(F.asList(1, 2))).get();
+
+ assertFalse(cache.containsKeysAsync(new HashSet<>(F.asList(1, 2))).get());
+ }
+
+ assertEquals("value2", cache.get(1));
+ assertFalse(cache.containsKey(2));
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index 713979a5303..77038a09a7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal.client.thin;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@@ -77,7 +79,7 @@ public class ReliableChannelTest {
/**
* Checks that in case if address specified without port, the default port will be processed first
- * */
+ */
@Test
public void testAddressWithoutPort() {
ClientConfiguration ccfg = new ClientConfiguration().setAddresses("127.0.0.1");
@@ -88,28 +90,38 @@ public class ReliableChannelTest {
assertEquals(ClientConnectorConfiguration.DFLT_PORT_RANGE + 1, rc.getChannelHolders().size());
- assertEquals(ClientConnectorConfiguration.DFLT_PORT, rc.getChannelHolders().iterator().next().getAddress().getPort());
+ assertEquals(ClientConnectorConfiguration.DFLT_PORT, F.first(rc.getChannelHolders()).getAddress().getPort());
+
+ assertEquals(0, rc.getCurrentChannelIndex());
}
/**
- * Checks that ReliableChannel provides channels in the same order as in ClientConfiguration.
- * */
+ * Checks that ReliableChannel chooses random address as default from the set of addresses with the same (minimal) port.
+ */
@Test
- public void testAddressesOrder() {
- String[] addrs = new String[] {"127.0.0.1:10803", "127.0.0.1:10802", "127.0.0.1:10801", "127.0.0.1:10800"};
+ public void testDefaultChannelBalancing() {
+ assertEquals(new HashSet<>(F.asList("127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800")),
+ usedDefaultChannels("127.0.0.1:10801..10809", "127.0.0.2", "127.0.0.3:10800", "127.0.0.4:10800..10809"));
+
+ assertEquals(new HashSet<>(F.asList("127.0.0.1:10800", "127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800")),
+ usedDefaultChannels("127.0.0.1:10800", "127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800"));
+ }
+ /** */
+ private Set<String> usedDefaultChannels(String... addrs) {
ClientConfiguration ccfg = new ClientConfiguration().setAddresses(addrs);
- ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+ Set<String> usedChannels = new HashSet<>();
- rc.channelsInit();
+ for (int i = 0; i < 100; i++) {
+ ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
- List<ReliableChannel.ClientChannelHolder> holders = rc.getChannelHolders();
+ rc.channelsInit();
- assertEquals(addrs.length, holders.size());
+ usedChannels.add(rc.getChannelHolders().get(rc.getCurrentChannelIndex()).getAddress().toString());
+ }
- for (int i = 0; i < addrs.length; i++)
- assertEquals(addrs[i], holders.get(i).getAddress().toString());
+ return usedChannels;
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index ba2624aeef3..9ffaa31a84a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT;
@@ -83,9 +84,6 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
/** Operations queue. */
protected final Queue<T2<TestTcpClientChannel, ClientOperation>> opsQueue = new ConcurrentLinkedQueue<>();
- /** Default channel. */
- protected TestTcpClientChannel dfltCh;
-
/** Client instance. */
protected IgniteClient client;
@@ -143,7 +141,7 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
/**
* Checks that operation goes through specified channel.
*/
- protected void assertOpOnChannel(TestTcpClientChannel expCh, ClientOperation expOp) {
+ protected void assertOpOnChannel(@Nullable TestTcpClientChannel expCh, ClientOperation expOp) {
T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ", expOp=" + expOp + ']', nextChOp);
@@ -151,8 +149,10 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
assertEquals("Unexpected operation on channel [expCh=" + expCh + ", expOp=" + expOp +
", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2());
- assertEquals("Unexpected channel for operation [expCh=" + expCh + ", expOp=" + expOp +
- ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());
+ if (expCh != null) {
+ assertEquals("Unexpected channel for operation [expCh=" + expCh + ", expOp=" + expOp +
+ ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());
+ }
}
/**
@@ -175,7 +175,7 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
return channels[i];
}
- return dfltCh;
+ return null;
}
/**
@@ -212,25 +212,19 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
awaitChannelsInit(chIdxs);
- initDefaultChannel();
+ opsQueue.clear();
}
/**
- *
+ * Trigger client to detect topology change.
*/
- protected void initDefaultChannel() {
+ protected void detectTopologyChange() {
opsQueue.clear();
- // Send non-affinity request to determine default channel.
+ // Send non-affinity request to detect topology change.
client.getOrCreateCache(REPL_CACHE_NAME);
- T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
-
- assertNotNull(nextChOp);
-
- assertEquals(nextChOp.get2(), ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
-
- dfltCh = nextChOp.get1();
+ assertOpOnChannel(null, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
new file mode 100644
index 00000000000..f89dffe8c3f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.BitSet;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.junit.Test;
+
+/**
+ * Test requests to connections distribution by thin client.
+ */
+public class ThinClientPartitionAwarenessBalancingTest extends ThinClientAbstractPartitionAwarenessTest {
+ /** */
+ @Test
+ public void testConnectionDistribution() throws Exception {
+ startGrids(3);
+
+ initClient(getClientConfiguration(0, 1, 2, 3), 0, 1, 2);
+
+ BitSet usedConnections = new BitSet();
+
+ for (int i = 0; i < 100; i++)
+ client.cacheNames(); // Non-affinity requests should be randomly distributed among connections.
+
+ List<TestTcpClientChannel> channelList = F.asList(channels);
+
+ while (!opsQueue.isEmpty()) {
+ T2<TestTcpClientChannel, ClientOperation> op = opsQueue.poll();
+
+ usedConnections.set(channelList.indexOf(op.get1()));
+ }
+
+ assertEquals(BitSet.valueOf(new byte[] {7}), usedConnections); // 7 = set of {0, 1, 2}
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
index d7611842ab8..dff4d8ea118 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
@@ -123,12 +123,15 @@ public class ThinClientPartitionAwarenessDiscoveryTest extends ThinClientAbstrac
clientCache.put(i, i);
if (i == 0)
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
- assertTrue(channelHits.containsKey(opCh));
- channelHits.compute(opCh, (c, old) -> true);
+ if (opCh != null) {
+ assertTrue(channelHits.containsKey(opCh));
+
+ channelHits.compute(opCh, (c, old) -> true);
+ }
}
assertFalse(channelHits.containsValue(false));
@@ -156,12 +159,4 @@ public class ThinClientPartitionAwarenessDiscoveryTest extends ThinClientAbstrac
.setAddressesFinder(addrFinder)
.setPartitionAwarenessEnabled(true);
}
-
- /**
- * Trigger client to detect topology change.
- */
- private void detectTopologyChange() {
- // Send non-affinity request to detect topology change.
- initDefaultChannel();
- }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
index 279d31865cf..6de86591c82 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -88,7 +88,7 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
clientCache.put(0, 0);
TestTcpClientChannel opCh = affinityChannel(0, gridCache);
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
for (int i = 1; i < KEY_CNT; i++)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
index 5556da02ee5..b94062b0988 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
@@ -148,8 +148,8 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
clientCache.put(keyForUnknownNode, 0);
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+ assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(null, ClientOperation.CACHE_PUT);
}
/**
@@ -191,18 +191,14 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
for (int i = 0; i < GRIDS_CNT; i++) {
int part = grid(i).affinity(PART_CACHE_NAME).primaryPartitions(grid(i).localNode())[0];
- // Client doesn't have connection with grid(0).
- TestTcpClientChannel ch = i == 0 ? dfltCh : nodeChannel(grid(i).localNode().id());
+ TestTcpClientChannel ch = nodeChannel(grid(i).localNode().id());
// Test scan query with specified partition.
clientCache.query(new ScanQuery<>().setPartition(part)).getAll();
+ // Client doesn't have connection with grid(0), ch will be null for this grid
+ // and operation on any channel is acceptable.
assertOpOnChannel(ch, ClientOperation.QUERY_SCAN);
-
- // Test scan query without specified partition.
- clientCache.query(new ScanQuery<>()).getAll();
-
- assertOpOnChannel(dfltCh, ClientOperation.QUERY_SCAN);
}
}
@@ -345,17 +341,17 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
// After first response we should send partitions request on default channel together with next request.
cache.put(0, 0);
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+ assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(null, ClientOperation.CACHE_PUT);
for (int i = 1; i < KEY_CNT; i++) {
cache.put(i, i);
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+ assertOpOnChannel(null, ClientOperation.CACHE_PUT);
cache.get(i);
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_GET);
+ assertOpOnChannel(null, ClientOperation.CACHE_GET);
}
}
@@ -371,9 +367,7 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
TestTcpClientChannel opCh = affinityChannel(keyFactory.apply(0), igniteCache);
- // Default channel is the first who detects topology change, so next partition request will go through
- // the default channel.
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
for (int i = 1; i < KEY_CNT; i++) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
index e167ab1652c..dfe93a97c69 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
@@ -60,7 +60,7 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
awaitChannelsInit(3);
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
+ assertOpOnChannel(null, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
Integer key = primaryKey(grid(3).cache(PART_CACHE_NAME));
@@ -68,9 +68,7 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
cache.put(key, 0);
- // Cache partitions are requested from default channel, since it's first (and currently the only) channel
- // which detects new topology.
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
assertOpOnChannel(channels[3], ClientOperation.CACHE_PUT);
@@ -98,8 +96,8 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
awaitPartitionMapExchange();
- // Next request will also detect topology change.
- initDefaultChannel();
+ // Detect topology change.
+ detectTopologyChange();
// Test partition awareness after node join.
testPartitionAwareness(true);
@@ -119,8 +117,8 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
// Test partition awareness before connection to node lost.
testPartitionAwareness(true);
- // Choose node to disconnect (not default node).
- int disconnectNodeIdx = dfltCh == channels[0] ? 1 : 0;
+ // Choose node to disconnect.
+ int disconnectNodeIdx = 0;
// Drop all thin connections from the node.
getMxBean(grid(disconnectNodeIdx).name(), "Clients",
@@ -137,8 +135,8 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
cache.put(key, 0);
- // Request goes to default channel, since affinity node is disconnected.
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+ // Request goes to the connected channel, since affinity node is disconnected.
+ assertOpOnChannel(channels[1], ClientOperation.CACHE_PUT);
cache.put(key, 0);
@@ -175,7 +173,7 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
// Send any request to failover.
client.cache(REPL_CACHE_NAME).put(0, 0);
- initDefaultChannel();
+ detectTopologyChange();
awaitChannelsInit(0, 1);
@@ -197,7 +195,7 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
clientCache.put(i, i);
if (partReq) {
- assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
partReq = false;
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 4a97cd5d1ee..83b6f0fa86d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -24,12 +24,14 @@ import org.apache.ignite.internal.client.thin.ClusterApiTest;
import org.apache.ignite.internal.client.thin.ClusterGroupTest;
import org.apache.ignite.internal.client.thin.ComputeTaskTest;
import org.apache.ignite.internal.client.thin.DataReplicationOperationsTest;
+import org.apache.ignite.internal.client.thin.FunctionalTest;
import org.apache.ignite.internal.client.thin.IgniteSetTest;
import org.apache.ignite.internal.client.thin.MetadataRegistrationTest;
import org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest;
import org.apache.ignite.internal.client.thin.ReliableChannelTest;
import org.apache.ignite.internal.client.thin.ServicesBinaryArraysTests;
import org.apache.ignite.internal.client.thin.ServicesTest;
+import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessBalancingTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessDiscoveryTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest;
@@ -69,6 +71,7 @@ import org.junit.runners.Suite;
ThinClientPartitionAwarenessUnstableTopologyTest.class,
ThinClientPartitionAwarenessResourceReleaseTest.class,
ThinClientPartitionAwarenessDiscoveryTest.class,
+ ThinClientPartitionAwarenessBalancingTest.class,
ReliableChannelTest.class,
CacheAsyncTest.class,
TimeoutTest.class,