You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2023/01/26 11:36:13 UTC

[ignite] branch master updated: IGNITE-18637 Java thin client: Connections balancing - Fixes #10497.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b825687e66 IGNITE-18637 Java thin client: Connections balancing - Fixes #10497.
8b825687e66 is described below

commit 8b825687e66f42999d7f27401b7df3ff5c733c14
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Thu Jan 26 14:33:31 2023 +0300

    IGNITE-18637 Java thin client: Connections balancing - Fixes #10497.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../internal/client/thin/ReliableChannel.java      |  49 +++++-
 .../internal/client/thin/TcpClientCache.java       | 191 ++++++++++++++++-----
 .../client/thin/TcpClientTransactions.java         |   9 +-
 .../client/thin}/FunctionalTest.java               |  95 ++++++----
 .../internal/client/thin/ReliableChannelTest.java  |  36 ++--
 .../ThinClientAbstractPartitionAwarenessTest.java  |  30 ++--
 .../ThinClientPartitionAwarenessBalancingTest.java |  52 ++++++
 .../ThinClientPartitionAwarenessDiscoveryTest.java |  17 +-
 ...lientPartitionAwarenessResourceReleaseTest.java |   2 +-
 ...ClientPartitionAwarenessStableTopologyTest.java |  26 ++-
 ...ientPartitionAwarenessUnstableTopologyTest.java |  22 ++-
 .../org/apache/ignite/client/ClientTestSuite.java  |   3 +
 12 files changed, 379 insertions(+), 153 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index d7e0b63d550..6207c7fcd75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -30,7 +31,9 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiFunction;
@@ -117,6 +120,9 @@ final class ReliableChannel implements AutoCloseable {
     /** Cache addresses returned by {@code ThinClientAddressFinder}. */
     private volatile String[] prevHostAddrs;
 
+    /** Open channels counter. */
+    private final AtomicInteger channelsCnt = new AtomicInteger();
+
     /**
      * Constructor.
      */
@@ -721,8 +727,18 @@ final class ReliableChannel implements AutoCloseable {
                 dfltChannelIdx = reinitHolders.size() - 1;
         }
 
-        if (dfltChannelIdx == -1)
-            dfltChannelIdx = 0;
+        if (dfltChannelIdx == -1) {
+            // If holder is not specified get the random holder from the range of holders with the same port.
+            reinitHolders.sort(Comparator.comparingInt(h -> h.getAddress().getPort()));
+
+            int limit = 0;
+            int port = reinitHolders.get(0).getAddress().getPort();
+
+            while (limit + 1 < reinitHolders.size() && reinitHolders.get(limit + 1).getAddress().getPort() == port)
+                limit++;
+
+            dfltChannelIdx = ThreadLocalRandom.current().nextInt(limit + 1);
+        }
 
         curChannelsGuard.writeLock().lock();
 
@@ -803,7 +819,21 @@ final class ReliableChannel implements AutoCloseable {
                 curChannelsGuard.readLock().lock();
 
                 try {
-                    hld = channels.get(curChIdx);
+                    if (!partitionAwarenessEnabled || channelsCnt.get() <= 1 || attempt != 0)
+                        hld = channels.get(curChIdx);
+                    else {
+                        // Make first attempt with the random open channel.
+                        int idx = ThreadLocalRandom.current().nextInt(channels.size());
+                        int idx0 = idx;
+
+                        do {
+                            hld = channels.get(idx);
+
+                            if (++idx == channels.size())
+                                idx = 0;
+                        }
+                        while (hld.ch == null && idx != idx0);
+                    }
                 }
                 finally {
                     curChannelsGuard.readLock().unlock();
@@ -1010,6 +1040,8 @@ final class ReliableChannel implements AutoCloseable {
                     }
 
                     ch = channel;
+
+                    channelsCnt.incrementAndGet();
                 }
             }
 
@@ -1024,6 +1056,8 @@ final class ReliableChannel implements AutoCloseable {
                 U.closeQuiet(ch);
 
                 ch = null;
+
+                channelsCnt.decrementAndGet();
             }
         }
 
@@ -1055,7 +1089,7 @@ final class ReliableChannel implements AutoCloseable {
     }
 
     /**
-     * Get holders reference. For test purposes.ClientOperation
+     * Get holders reference. For test purposes.
      */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") // For tests.
     List<ClientChannelHolder> getChannelHolders() {
@@ -1070,6 +1104,13 @@ final class ReliableChannel implements AutoCloseable {
         return nodeChannels;
     }
 
+    /**
+     * Get index of current (default) channel holder. For test purposes.
+     */
+    int getCurrentChannelIndex() {
+        return curChIdx;
+    }
+
     /**
      * Get scheduledChannelsReinit reference. For test purposes.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index fee4d55edb6..62dd9f1d83e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import javax.cache.Cache;
@@ -42,6 +43,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.client.ClientCache;
 import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientDisconnectListener;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
@@ -228,9 +230,11 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (keys.isEmpty())
             return true;
 
-        return ch.service(
+        TcpClientTransaction tx = transactions.tx();
+
+        return txAwareService(null, tx,
             ClientOperation.CACHE_CONTAINS_KEYS,
-            req -> writeKeys(keys, req),
+            req -> writeKeys(keys, req, tx),
             res -> res.in().readBoolean());
     }
 
@@ -242,9 +246,11 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (keys.isEmpty())
             return IgniteClientFutureImpl.completedFuture(true);
 
-        return ch.serviceAsync(
+        TcpClientTransaction tx = transactions.tx();
+
+        return txAwareServiceAsync(null, tx,
             ClientOperation.CACHE_CONTAINS_KEYS,
-            req -> writeKeys(keys, req),
+            req -> writeKeys(keys, req, tx),
             res -> res.in().readBoolean());
     }
 
@@ -303,7 +309,12 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (keys.isEmpty())
             return new HashMap<>();
 
-        return ch.service(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries);
+        TcpClientTransaction tx = transactions.tx();
+
+        return txAwareService(null, tx,
+            ClientOperation.CACHE_GET_ALL,
+            req -> writeKeys(keys, req, tx),
+            this::readEntries);
     }
 
     /** {@inheritDoc} */
@@ -314,7 +325,13 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (keys.isEmpty())
             return IgniteClientFutureImpl.completedFuture(new HashMap<>());
 
-        return ch.serviceAsync(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries);
+        TcpClientTransaction tx = transactions.tx();
+
+        return txAwareServiceAsync(null, tx,
+            ClientOperation.CACHE_GET_ALL,
+            req -> writeKeys(keys, req, tx),
+            this::readEntries);
+
     }
 
     /** {@inheritDoc} */
@@ -325,12 +342,28 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (map.isEmpty())
             return;
 
-        ch.request(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req));
+        TcpClientTransaction tx = transactions.tx();
+
+        txAwareService(null, tx,
+            ClientOperation.CACHE_PUT_ALL,
+            req -> writeEntries(map, req, tx),
+            null);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteClientFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) throws ClientException {
-        return ch.requestAsync(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req));
+        if (map == null)
+            throw new NullPointerException("map");
+
+        if (map.isEmpty())
+            return IgniteClientFutureImpl.completedFuture(null);
+
+        TcpClientTransaction tx = transactions.tx();
+
+        return txAwareServiceAsync(null, tx,
+            ClientOperation.CACHE_PUT_ALL,
+            req -> writeEntries(map, req, tx),
+            null);
     }
 
     /** {@inheritDoc} */
@@ -475,11 +508,14 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (keys.isEmpty())
             return;
 
-        ch.request(
+        TcpClientTransaction tx = transactions.tx();
+
+        txAwareService(null, tx,
             ClientOperation.CACHE_REMOVE_KEYS,
             req -> {
-                writeKeys(keys, req);
-            }
+                writeKeys(keys, req, tx);
+            },
+            null
         );
     }
 
@@ -491,11 +527,14 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (keys.isEmpty())
             return IgniteClientFutureImpl.completedFuture(null);
 
-        return ch.requestAsync(
-                ClientOperation.CACHE_REMOVE_KEYS,
-                req -> {
-                    writeKeys(keys, req);
-                }
+        TcpClientTransaction tx = transactions.tx();
+
+        return txAwareServiceAsync(null, tx,
+            ClientOperation.CACHE_REMOVE_KEYS,
+            req -> {
+                writeKeys(keys, req, tx);
+            },
+            null
         );
     }
 
@@ -707,9 +746,12 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (keys.isEmpty())
             return;
 
-        ch.request(
+        TcpClientTransaction tx = transactions.tx();
+
+        txAwareService(null, tx,
             ClientOperation.CACHE_CLEAR_KEYS,
-            req -> writeKeys(keys, req)
+            req -> writeKeys(keys, req, tx),
+            null
         );
     }
 
@@ -721,9 +763,12 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (keys.isEmpty())
             return IgniteClientFutureImpl.completedFuture(null);
 
-        return ch.requestAsync(
+        TcpClientTransaction tx = transactions.tx();
+
+        return txAwareServiceAsync(null, tx,
             ClientOperation.CACHE_CLEAR_KEYS,
-            req -> writeKeys(keys, req)
+            req -> writeKeys(keys, req, tx),
+            null
         );
     }
 
@@ -1062,6 +1107,68 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         ));
     }
 
+    /**
+     * Execute operation on channel most suitable for transactional context.
+     */
+    private <T> T txAwareService(
+        @Nullable K affKey,
+        TcpClientTransaction tx,
+        ClientOperation op,
+        Consumer<PayloadOutputChannel> payloadWriter,
+        Function<PayloadInputChannel, T> payloadReader
+    ) {
+        // Transactional operation cannot be executed on affinity node, it should be executed on node started
+        // the transaction.
+        if (tx != null) {
+            try {
+                return tx.clientChannel().service(op, payloadWriter, payloadReader);
+            }
+            catch (ClientConnectionException e) {
+                throw new ClientException("Transaction context has been lost due to connection errors. " +
+                    "Cache operations are prohibited until current transaction closed.", e);
+            }
+        }
+        else if (affKey != null)
+            return ch.affinityService(cacheId, affKey, op, payloadWriter, payloadReader);
+        else
+            return ch.service(op, payloadWriter, payloadReader);
+    }
+
+    /**
+     * Execute operation on channel most suitable for transactional context.
+     */
+    private <T> IgniteClientFuture<T> txAwareServiceAsync(
+        @Nullable K affKey,
+        TcpClientTransaction tx,
+        ClientOperation op,
+        Consumer<PayloadOutputChannel> payloadWriter,
+        Function<PayloadInputChannel, T> payloadReader
+    ) {
+        // Transactional operation cannot be executed on affinity node, it should be executed on node started
+        // the transaction.
+        if (tx != null) {
+            CompletableFuture<T> fut = new CompletableFuture<>();
+
+            tx.clientChannel().serviceAsync(op, payloadWriter, payloadReader).whenComplete((res, err) -> {
+                if (err instanceof ClientConnectionException) {
+                    fut.completeExceptionally(
+                        new ClientException("Transaction context has been lost due to connection errors. " +
+                            "Cache operations are prohibited until current transaction closed.", err));
+                }
+                else if (err != null)
+                    fut.completeExceptionally(err);
+                else
+                    fut.complete(res);
+            });
+
+            return new IgniteClientFutureImpl<>(fut);
+        }
+        else if (affKey != null)
+            return ch.affinityServiceAsync(cacheId, affKey, op, payloadWriter, payloadReader);
+        else
+            return ch.serviceAsync(op, payloadWriter, payloadReader);
+    }
+
     /**
      * Execute cache operation with a single key.
      */
@@ -1071,18 +1178,17 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         Consumer<PayloadOutputChannel> additionalPayloadWriter,
         Function<PayloadInputChannel, T> payloadReader
     ) throws ClientException {
+        TcpClientTransaction tx = transactions.tx();
+
         Consumer<PayloadOutputChannel> payloadWriter = req -> {
-            writeCacheInfo(req);
+            writeCacheInfo(req, tx);
             writeObject(req, key);
 
             if (additionalPayloadWriter != null)
                 additionalPayloadWriter.accept(req);
         };
 
-        // Transactional operation cannot be executed on affinity node, it should be executed on node started
-        // the transaction.
-        return transactions.tx() == null ? ch.affinityService(cacheId, key, op, payloadWriter, payloadReader) :
-            ch.service(op, payloadWriter, payloadReader);
+        return txAwareService(key, tx, op, payloadWriter, payloadReader);
     }
 
     /**
@@ -1094,31 +1200,32 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
         Consumer<PayloadOutputChannel> additionalPayloadWriter,
         Function<PayloadInputChannel, T> payloadReader
     ) throws ClientException {
+        TcpClientTransaction tx = transactions.tx();
+
         Consumer<PayloadOutputChannel> payloadWriter = req -> {
-            writeCacheInfo(req);
+            writeCacheInfo(req, tx);
             writeObject(req, key);
 
             if (additionalPayloadWriter != null)
                 additionalPayloadWriter.accept(req);
         };
 
-        // Transactional operation cannot be executed on affinity node, it should be executed on node started
-        // the transaction.
-        return transactions.tx() == null
-                ? ch.affinityServiceAsync(cacheId, key, op, payloadWriter, payloadReader)
-                : ch.serviceAsync(op, payloadWriter, payloadReader);
+        return txAwareServiceAsync(key, tx, op, payloadWriter, payloadReader);
     }
 
-    /** Write cache ID and flags. */
+    /** Write cache ID and flags for non-transactional operations. */
     private void writeCacheInfo(PayloadOutputChannel payloadCh) {
+        writeCacheInfo(payloadCh, null);
+    }
+
+    /** Write cache ID and flags. */
+    private void writeCacheInfo(PayloadOutputChannel payloadCh, TcpClientTransaction tx) {
         BinaryOutputStream out = payloadCh.out();
 
         out.writeInt(cacheId);
 
         byte flags = keepBinary ? KEEP_BINARY_FLAG_MASK : 0;
 
-        TcpClientTransaction tx = transactions.tx();
-
         if (expiryPlc != null) {
             ProtocolContext protocolCtx = payloadCh.clientChannel().protocolCtx();
 
@@ -1130,14 +1237,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
             flags |= WITH_EXPIRY_POLICY_FLAG_MASK;
         }
 
-        if (tx != null) {
-            if (tx.clientChannel() != payloadCh.clientChannel()) {
-                throw new ClientException("Transaction context has been lost due to connection errors. " +
-                    "Cache operations are prohibited until current transaction closed.");
-            }
-
+        if (tx != null)
             flags |= TRANSACTIONAL_FLAG_MASK;
-        }
 
         out.writeByte(flags);
 
@@ -1177,8 +1278,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
     }
 
     /** */
-    private void writeKeys(Set<? extends K> keys, PayloadOutputChannel req) {
-        writeCacheInfo(req);
+    private void writeKeys(Set<? extends K> keys, PayloadOutputChannel req, TcpClientTransaction tx) {
+        writeCacheInfo(req, tx);
         ClientUtils.collection(keys, req.out(), serDes::writeObject);
     }
 
@@ -1196,8 +1297,8 @@ public class TcpClientCache<K, V> implements ClientCache<K, V> {
     }
 
     /** */
-    private void writeEntries(Map<? extends K, ? extends V> map, PayloadOutputChannel req) {
-        writeCacheInfo(req);
+    private void writeEntries(Map<? extends K, ? extends V> map, PayloadOutputChannel req, TcpClientTransaction tx) {
+        writeCacheInfo(req, tx);
         ClientUtils.collection(
                 map.entrySet(),
                 req.out(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
index 3552968c370..1b72dab3674 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientTransactions.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.thin;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.ClientTransaction;
 import org.apache.ignite.client.ClientTransactions;
@@ -238,15 +239,15 @@ class TcpClientTransactions implements ClientTransactions {
          */
         private void endTx(boolean committed) {
             try {
-                ch.service(ClientOperation.TX_END,
+                clientCh.service(ClientOperation.TX_END,
                     req -> {
-                        if (clientCh != req.clientChannel())
-                            throw new ClientException("Transaction context has been lost due to connection errors");
-
                         req.out().writeInt(txId);
                         req.out().writeBoolean(committed);
                     }, null);
             }
+            catch (ClientConnectionException e) {
+                throw new ClientException("Transaction context has been lost due to connection errors", e);
+            }
             finally {
                 txMap.remove(txUid);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java
similarity index 96%
rename from modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
rename to modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java
index 792a9980e30..c278fc5f3d8 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/FunctionalTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/FunctionalTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.client;
+package org.apache.ignite.internal.client.thin;
 
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
@@ -57,6 +57,16 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientTransaction;
+import org.apache.ignite.client.Comparers;
+import org.apache.ignite.client.Config;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.LocalIgniteCluster;
+import org.apache.ignite.client.Person;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.ClientConnectorConfiguration;
@@ -66,15 +76,12 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.AbstractBinaryArraysTest;
-import org.apache.ignite.internal.client.thin.ClientServerError;
 import org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum;
-import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
 import org.apache.ignite.internal.processors.platform.client.ClientStatus;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.mxbean.ClientProcessorMXBean;
 import org.apache.ignite.spi.systemview.view.SystemView;
 import org.apache.ignite.spi.systemview.view.TransactionView;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -828,8 +835,11 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
      */
     @Test
     public void testTransactions() throws Exception {
-        try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
-             IgniteClient client = Ignition.startClient(getClientConfiguration())
+        int clusterSize = 3;
+
+        try (LocalIgniteCluster cluster = LocalIgniteCluster.start(clusterSize);
+             IgniteClient client = Ignition.startClient(getClientConfiguration()
+                 .setAddresses(cluster.clientAddresses().toArray(new String[clusterSize])))
         ) {
             ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
                 .setName("cache")
@@ -937,13 +947,10 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
             cache.put(1, "value5");
 
             // Test failover.
-            ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
-                ClientListenerProcessor.class, ClientProcessorMXBean.class);
-
             try (ClientTransaction tx = client.transactions().txStart()) {
                 cache.put(1, "value6");
 
-                mxBean.dropAllConnections();
+                ((TcpClientTransactions.TcpClientTransaction)tx).clientChannel().close();
 
                 try {
                     cache.put(1, "value7");
@@ -1157,9 +1164,35 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
                 assertEquals("value23", cache.get(0));
             }
 
+            // Test that new transaction can be started after commit of the previous one without closing.
+            ClientTransaction tx = client.transactions().txStart();
+            tx.commit();
+
+            tx = client.transactions().txStart();
+            tx.rollback();
+
+            // Test that new transaction can be started after rollback of the previous one without closing.
+            tx = client.transactions().txStart();
+            tx.commit();
+
+            // Test that implicit transaction started after commit of previous one without closing.
+            cache.put(0, "value24");
+
+            GridTestUtils.runAsync(() -> assertEquals("value24", cache.get(0))).get();
+        }
+    }
+
+    /**
+     * Test transactions.
+     */
+    @Test
+    public void testTransactionsLimit() throws Exception {
+        try (IgniteEx ignite = (IgniteEx)Ignition.start(Config.getServerConfiguration());
+             IgniteClient client = Ignition.startClient(getClientConfiguration())
+        ) {
             // Test active transactions limit.
-            int txLimit = ignite.configuration().getClientConnectorConfiguration().getThinClientConfiguration()
-                .getMaxActiveTxPerConnection();
+            int txLimit = ignite.configuration().getClientConnectorConfiguration()
+                .getThinClientConfiguration().getMaxActiveTxPerConnection();
 
             List<ClientTransaction> txs = new ArrayList<>(txLimit);
 
@@ -1181,22 +1214,6 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
 
             for (ClientTransaction tx : txs)
                 tx.close();
-
-            // Test that new transaction can be started after commit of the previous one without closing.
-            ClientTransaction tx = client.transactions().txStart();
-            tx.commit();
-
-            tx = client.transactions().txStart();
-            tx.rollback();
-
-            // Test that new transaction can be started after rollback of the previous one without closing.
-            tx = client.transactions().txStart();
-            tx.commit();
-
-            // Test that implicit transaction started after commit of previous one without closing.
-            cache.put(0, "value24");
-
-            GridTestUtils.runAsync(() -> assertEquals("value24", cache.get(0))).get();
         }
     }
 
@@ -1205,8 +1222,11 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
      */
     @Test
     public void testTransactionsAsync() throws Exception {
-        try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
-             IgniteClient client = Ignition.startClient(getClientConfiguration())
+        int clusterSize = 3;
+
+        try (LocalIgniteCluster cluster = LocalIgniteCluster.start(clusterSize);
+             IgniteClient client = Ignition.startClient(getClientConfiguration()
+                 .setAddresses(cluster.clientAddresses().toArray(new String[clusterSize])))
         ) {
             ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
                     .setName("cache")
@@ -1240,6 +1260,21 @@ public class FunctionalTest extends AbstractBinaryArraysTest {
             }
 
             assertEquals("value2", cache.get(1));
+
+            // Test multi-key operations.
+            try (ClientTransaction tx = client.transactions().txStart()) {
+                cache.putAllAsync(F.asMap(1, "value3", 2, "value3")).get();
+
+                assertEquals(F.asMap(1, "value3", 2, "value3"),
+                    cache.getAllAsync(new HashSet<>(F.asList(1, 2))).get());
+
+                cache.removeAllAsync(new HashSet<>(F.asList(1, 2))).get();
+
+                assertFalse(cache.containsKeysAsync(new HashSet<>(F.asList(1, 2))).get());
+            }
+
+            assertEquals("value2", cache.get(1));
+            assertFalse(cache.containsKey(2));
         }
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index 713979a5303..77038a09a7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal.client.thin;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -77,7 +79,7 @@ public class ReliableChannelTest {
 
     /**
      * Checks that in case if address specified without port, the default port will be processed first
-     * */
+     */
     @Test
     public void testAddressWithoutPort() {
         ClientConfiguration ccfg = new ClientConfiguration().setAddresses("127.0.0.1");
@@ -88,28 +90,38 @@ public class ReliableChannelTest {
 
         assertEquals(ClientConnectorConfiguration.DFLT_PORT_RANGE + 1, rc.getChannelHolders().size());
 
-        assertEquals(ClientConnectorConfiguration.DFLT_PORT, rc.getChannelHolders().iterator().next().getAddress().getPort());
+        assertEquals(ClientConnectorConfiguration.DFLT_PORT, F.first(rc.getChannelHolders()).getAddress().getPort());
+
+        assertEquals(0, rc.getCurrentChannelIndex());
     }
 
     /**
-     * Checks that ReliableChannel provides channels in the same order as in ClientConfiguration.
-     * */
+     * Checks that ReliableChannel chooses random address as default from the set of addresses with the same (minimal) port.
+     */
     @Test
-    public void testAddressesOrder() {
-        String[] addrs = new String[] {"127.0.0.1:10803", "127.0.0.1:10802", "127.0.0.1:10801", "127.0.0.1:10800"};
+    public void testDefaultChannelBalancing() {
+        assertEquals(new HashSet<>(F.asList("127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800")),
+            usedDefaultChannels("127.0.0.1:10801..10809", "127.0.0.2", "127.0.0.3:10800", "127.0.0.4:10800..10809"));
+
+        assertEquals(new HashSet<>(F.asList("127.0.0.1:10800", "127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800")),
+            usedDefaultChannels("127.0.0.1:10800", "127.0.0.2:10800", "127.0.0.3:10800", "127.0.0.4:10800"));
+    }
 
+    /** */
+    private Set<String> usedDefaultChannels(String... addrs) {
         ClientConfiguration ccfg = new ClientConfiguration().setAddresses(addrs);
 
-        ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+        Set<String> usedChannels = new HashSet<>();
 
-        rc.channelsInit();
+        for (int i = 0; i < 100; i++) {
+            ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
 
-        List<ReliableChannel.ClientChannelHolder> holders = rc.getChannelHolders();
+            rc.channelsInit();
 
-        assertEquals(addrs.length, holders.size());
+            usedChannels.add(rc.getChannelHolders().get(rc.getCurrentChannelIndex()).getAddress().toString());
+        }
 
-        for (int i = 0; i < addrs.length; i++)
-            assertEquals(addrs[i], holders.get(i).getAddress().toString());
+        return usedChannels;
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index ba2624aeef3..9ffaa31a84a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT;
 
@@ -83,9 +84,6 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
     /** Operations queue. */
     protected final Queue<T2<TestTcpClientChannel, ClientOperation>> opsQueue = new ConcurrentLinkedQueue<>();
 
-    /** Default channel. */
-    protected TestTcpClientChannel dfltCh;
-
     /** Client instance. */
     protected IgniteClient client;
 
@@ -143,7 +141,7 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
     /**
      * Checks that operation goes through specified channel.
      */
-    protected void assertOpOnChannel(TestTcpClientChannel expCh, ClientOperation expOp) {
+    protected void assertOpOnChannel(@Nullable TestTcpClientChannel expCh, ClientOperation expOp) {
         T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
 
         assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ", expOp=" + expOp + ']', nextChOp);
@@ -151,8 +149,10 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
         assertEquals("Unexpected operation on channel [expCh=" + expCh + ", expOp=" + expOp +
                 ", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2());
 
-        assertEquals("Unexpected channel for operation [expCh=" + expCh + ", expOp=" + expOp +
-            ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());
+        if (expCh != null) {
+            assertEquals("Unexpected channel for operation [expCh=" + expCh + ", expOp=" + expOp +
+                ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());
+        }
     }
 
     /**
@@ -175,7 +175,7 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
                 return channels[i];
         }
 
-        return dfltCh;
+        return null;
     }
 
     /**
@@ -212,25 +212,19 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
 
         awaitChannelsInit(chIdxs);
 
-        initDefaultChannel();
+        opsQueue.clear();
     }
 
     /**
-     *
+     * Trigger client to detect topology change.
      */
-    protected void initDefaultChannel() {
+    protected void detectTopologyChange() {
         opsQueue.clear();
 
-        // Send non-affinity request to determine default channel.
+        // Send non-affinity request to detect topology change.
         client.getOrCreateCache(REPL_CACHE_NAME);
 
-        T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
-
-        assertNotNull(nextChOp);
-
-        assertEquals(nextChOp.get2(), ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
-
-        dfltCh = nextChOp.get1();
+        assertOpOnChannel(null, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
new file mode 100644
index 00000000000..f89dffe8c3f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessBalancingTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.BitSet;
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.junit.Test;
+
+/**
+ * Test requests to connections distribution by thin client.
+ */
+public class ThinClientPartitionAwarenessBalancingTest extends ThinClientAbstractPartitionAwarenessTest {
+    /** */
+    @Test
+    public void testConnectionDistribution() throws Exception {
+        startGrids(3);
+
+        initClient(getClientConfiguration(0, 1, 2, 3), 0, 1, 2);
+
+        BitSet usedConnections = new BitSet();
+
+        for (int i = 0; i < 100; i++)
+            client.cacheNames(); // Non-affinity requests should be randomly distributed among connections.
+
+        List<TestTcpClientChannel> channelList = F.asList(channels);
+
+        while (!opsQueue.isEmpty()) {
+            T2<TestTcpClientChannel, ClientOperation> op = opsQueue.poll();
+
+            usedConnections.set(channelList.indexOf(op.get1()));
+        }
+
+        assertEquals(BitSet.valueOf(new byte[] {7}), usedConnections); // 7 = set of {0, 1, 2}
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
index d7611842ab8..dff4d8ea118 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
@@ -123,12 +123,15 @@ public class ThinClientPartitionAwarenessDiscoveryTest extends ThinClientAbstrac
             clientCache.put(i, i);
 
             if (i == 0)
-                assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+                assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
 
             assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
-            assertTrue(channelHits.containsKey(opCh));
 
-            channelHits.compute(opCh, (c, old) -> true);
+            if (opCh != null) {
+                assertTrue(channelHits.containsKey(opCh));
+
+                channelHits.compute(opCh, (c, old) -> true);
+            }
         }
 
         assertFalse(channelHits.containsValue(false));
@@ -156,12 +159,4 @@ public class ThinClientPartitionAwarenessDiscoveryTest extends ThinClientAbstrac
             .setAddressesFinder(addrFinder)
             .setPartitionAwarenessEnabled(true);
     }
-
-    /**
-     * Trigger client to detect topology change.
-     */
-    private void detectTopologyChange() {
-        // Send non-affinity request to detect topology change.
-        initDefaultChannel();
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
index 279d31865cf..6de86591c82 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -88,7 +88,7 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
         clientCache.put(0, 0);
         TestTcpClientChannel opCh = affinityChannel(0, gridCache);
 
-        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
         assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
 
         for (int i = 1; i < KEY_CNT; i++)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
index 5556da02ee5..b94062b0988 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
@@ -148,8 +148,8 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
 
         clientCache.put(keyForUnknownNode, 0);
 
-        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
-        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+        assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(null, ClientOperation.CACHE_PUT);
     }
 
     /**
@@ -191,18 +191,14 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
         for (int i = 0; i < GRIDS_CNT; i++) {
             int part = grid(i).affinity(PART_CACHE_NAME).primaryPartitions(grid(i).localNode())[0];
 
-            // Client doesn't have connection with grid(0).
-            TestTcpClientChannel ch = i == 0 ? dfltCh : nodeChannel(grid(i).localNode().id());
+            TestTcpClientChannel ch = nodeChannel(grid(i).localNode().id());
 
             // Test scan query with specified partition.
             clientCache.query(new ScanQuery<>().setPartition(part)).getAll();
 
+            // Client doesn't have connection with grid(0), ch will be null for this grid
+            // and operation on any channel is acceptable.
             assertOpOnChannel(ch, ClientOperation.QUERY_SCAN);
-
-            // Test scan query without specified partition.
-            clientCache.query(new ScanQuery<>()).getAll();
-
-            assertOpOnChannel(dfltCh, ClientOperation.QUERY_SCAN);
         }
     }
 
@@ -345,17 +341,17 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
         // After first response we should send partitions request on default channel together with next request.
         cache.put(0, 0);
 
-        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
-        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+        assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(null, ClientOperation.CACHE_PUT);
 
         for (int i = 1; i < KEY_CNT; i++) {
             cache.put(i, i);
 
-            assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+            assertOpOnChannel(null, ClientOperation.CACHE_PUT);
 
             cache.get(i);
 
-            assertOpOnChannel(dfltCh, ClientOperation.CACHE_GET);
+            assertOpOnChannel(null, ClientOperation.CACHE_GET);
         }
     }
 
@@ -371,9 +367,7 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
 
         TestTcpClientChannel opCh = affinityChannel(keyFactory.apply(0), igniteCache);
 
-        // Default channel is the first who detects topology change, so next partition request will go through
-        // the default channel.
-        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
         assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
 
         for (int i = 1; i < KEY_CNT; i++) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
index e167ab1652c..dfe93a97c69 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
@@ -60,7 +60,7 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
 
         awaitChannelsInit(3);
 
-        assertOpOnChannel(dfltCh, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
+        assertOpOnChannel(null, ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME);
 
         Integer key = primaryKey(grid(3).cache(PART_CACHE_NAME));
 
@@ -68,9 +68,7 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
 
         cache.put(key, 0);
 
-        // Cache partitions are requested from default channel, since it's first (and currently the only) channel
-        // which detects new topology.
-        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+        assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
 
         assertOpOnChannel(channels[3], ClientOperation.CACHE_PUT);
 
@@ -98,8 +96,8 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
 
         awaitPartitionMapExchange();
 
-        // Next request will also detect topology change.
-        initDefaultChannel();
+        // Detect topology change.
+        detectTopologyChange();
 
         // Test partition awareness after node join.
         testPartitionAwareness(true);
@@ -119,8 +117,8 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
         // Test partition awareness before connection to node lost.
         testPartitionAwareness(true);
 
-        // Choose node to disconnect (not default node).
-        int disconnectNodeIdx = dfltCh == channels[0] ? 1 : 0;
+        // Choose node to disconnect.
+        int disconnectNodeIdx = 0;
 
         // Drop all thin connections from the node.
         getMxBean(grid(disconnectNodeIdx).name(), "Clients",
@@ -137,8 +135,8 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
 
         cache.put(key, 0);
 
-        // Request goes to default channel, since affinity node is disconnected.
-        assertOpOnChannel(dfltCh, ClientOperation.CACHE_PUT);
+        // Request goes to the connected channel, since affinity node is disconnected.
+        assertOpOnChannel(channels[1], ClientOperation.CACHE_PUT);
 
         cache.put(key, 0);
 
@@ -175,7 +173,7 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
         // Send any request to failover.
         client.cache(REPL_CACHE_NAME).put(0, 0);
 
-        initDefaultChannel();
+        detectTopologyChange();
 
         awaitChannelsInit(0, 1);
 
@@ -197,7 +195,7 @@ public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
             clientCache.put(i, i);
 
             if (partReq) {
-                assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+                assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
 
                 partReq = false;
             }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 4a97cd5d1ee..83b6f0fa86d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -24,12 +24,14 @@ import org.apache.ignite.internal.client.thin.ClusterApiTest;
 import org.apache.ignite.internal.client.thin.ClusterGroupTest;
 import org.apache.ignite.internal.client.thin.ComputeTaskTest;
 import org.apache.ignite.internal.client.thin.DataReplicationOperationsTest;
+import org.apache.ignite.internal.client.thin.FunctionalTest;
 import org.apache.ignite.internal.client.thin.IgniteSetTest;
 import org.apache.ignite.internal.client.thin.MetadataRegistrationTest;
 import org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest;
 import org.apache.ignite.internal.client.thin.ReliableChannelTest;
 import org.apache.ignite.internal.client.thin.ServicesBinaryArraysTests;
 import org.apache.ignite.internal.client.thin.ServicesTest;
+import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessBalancingTest;
 import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessDiscoveryTest;
 import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest;
 import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest;
@@ -69,6 +71,7 @@ import org.junit.runners.Suite;
     ThinClientPartitionAwarenessUnstableTopologyTest.class,
     ThinClientPartitionAwarenessResourceReleaseTest.class,
     ThinClientPartitionAwarenessDiscoveryTest.class,
+    ThinClientPartitionAwarenessBalancingTest.class,
     ReliableChannelTest.class,
     CacheAsyncTest.class,
     TimeoutTest.class,