You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/10/14 14:42:35 UTC

[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8206: IGNITE-13204 Add Java thin client Kubernetes auto discovery

alex-plekhanov commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r504452829



##########
File path: modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
##########
@@ -118,21 +123,47 @@
     /** Reconnect throttling retries. See {@code reconnectThrottlingPeriod}. */
     private int reconnectThrottlingRetries = 3;
 
+    /** Retry limit. */
+    private int retryLimit = 0;
+
     /** Executor for async operations continuations. */
     private Executor asyncContinuationExecutor;
 
     /**
      * @return Host addresses.
      */
     public String[] getAddresses() {
-        return addrs;
+        if (addrs != null)
+            return Arrays.copyOf(addrs, addrs.length);

Review comment:
       NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -223,67 +203,78 @@
     ) throws ClientException, ClientError {
         CompletableFuture<T> fut = new CompletableFuture<>();
 
-        ClientChannel ch = channel();
-
-        ch.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) ->
-                handleServiceAsync(op, payloadWriter, payloadReader, fut, null, null, ch, res, err));
+        // Use the only one attempt to avoid blocking async method.
+        handleServiceAsync(fut, op, payloadWriter, payloadReader, 1, null);
 
         return new IgniteClientFutureImpl<>(fut);
     }
 
     /**
      * Handles serviceAsync results and retries as needed.
      */
-    private <T> Object handleServiceAsync(ClientOperation op,
-                                          Consumer<PayloadOutputChannel> payloadWriter,
-                                          Function<PayloadInputChannel, T> payloadReader,
-                                          CompletableFuture<T> fut,
-                                          ClientConnectionException failure,
-                                          AtomicInteger chIdx,
-                                          ClientChannel ch,
-                                          T res,
-                                          Throwable err) {
-        if (err == null) {
-            fut.complete(res);
-            return null;
+    private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+                                        ClientOperation op,
+                                        Consumer<PayloadOutputChannel> payloadWriter,
+                                        Function<PayloadInputChannel, T> payloadReader,
+                                        int attemptsLimit,
+                                        ClientConnectionException failure) {
+        T2<ClientChannel, Integer> chAndAttempts;
+
+        try {
+            chAndAttempts = applyOnDefaultChannel(channel -> channel, attemptsLimit);
+
+        } catch (Throwable ex) {
+            fut.completeExceptionally(ex);

Review comment:
       NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -223,67 +203,78 @@
     ) throws ClientException, ClientError {
         CompletableFuture<T> fut = new CompletableFuture<>();
 
-        ClientChannel ch = channel();
-
-        ch.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) ->
-                handleServiceAsync(op, payloadWriter, payloadReader, fut, null, null, ch, res, err));
+        // Use the only one attempt to avoid blocking async method.
+        handleServiceAsync(fut, op, payloadWriter, payloadReader, 1, null);
 
         return new IgniteClientFutureImpl<>(fut);
     }
 
     /**
      * Handles serviceAsync results and retries as needed.
      */
-    private <T> Object handleServiceAsync(ClientOperation op,
-                                          Consumer<PayloadOutputChannel> payloadWriter,
-                                          Function<PayloadInputChannel, T> payloadReader,
-                                          CompletableFuture<T> fut,
-                                          ClientConnectionException failure,
-                                          AtomicInteger chIdx,
-                                          ClientChannel ch,
-                                          T res,
-                                          Throwable err) {
-        if (err == null) {
-            fut.complete(res);
-            return null;
+    private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+                                        ClientOperation op,
+                                        Consumer<PayloadOutputChannel> payloadWriter,
+                                        Function<PayloadInputChannel, T> payloadReader,
+                                        int attemptsLimit,
+                                        ClientConnectionException failure) {
+        T2<ClientChannel, Integer> chAndAttempts;
+
+        try {
+            chAndAttempts = applyOnDefaultChannel(channel -> channel, attemptsLimit);
+

Review comment:
       Redundant NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -606,16 +603,271 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /**
+     * Should the channel initialization be stopped.
+     */
+    private boolean shouldStopChannelsReinit() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     * @return {@code true} if holders are reinited and {@code false} if the initialization was interrupted.
+     */
+    synchronized boolean initChannelHolders(boolean force) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (!force && holders != null)
+            return true;
+
+        startChannelsReInit = System.currentTimeMillis();
+
+        // Enable parallel threads to schedule new init of channel holders.
+        scheduledChannelsReinit.set(false);
+
+        Map<InetSocketAddress, Integer> newAddrs = null;
+
+        if (clientCfg.getAddressesFinder() != null) {
+            String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
+
+            if (hostAddrs.length == 0)
+                throw new ClientException("Empty addresses");
+
+            if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+                newAddrs = parsedAddresses(hostAddrs);
+                prevHostAddrs = hostAddrs;
+            }
+        } else if (holders == null)
+            newAddrs = parsedAddresses(clientCfg.getAddresses());
+
+        if (newAddrs == null) {
+            finishChannelsReInit = System.currentTimeMillis();
+            return true;
+        }
+
+        Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
+        Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
+
+        if (holders != null) {
+            for (int i = 0; i < holders.size(); i++) {
+                ClientChannelHolder h = holders.get(i);
+
+                curAddrs.put(h.chCfg.getAddress(), h);
+                allAddrs.add(h.chCfg.getAddress());
+            }
+        }
+
+        List<ClientChannelHolder> reinitHolders = new ArrayList<>();
+
+        // The variable holds a new index of default channel after topology change.
+        // Suppose that reuse of the channel is better than open new connection.
+        int dfltChannelIdx = -1;
+
+        ClientChannelHolder currDfltHolder = null;
+
+        int idx = curChIdx;
+
+        if (idx != -1)
+            currDfltHolder = holders.get(idx);
+
+        for (InetSocketAddress addr : allAddrs) {
+            if (shouldStopChannelsReinit())
+                return false;
+
+            // Obsolete addr, to be removed.
+            if (!newAddrs.containsKey(addr)) {
+                curAddrs.get(addr).close();
+
+                continue;
+            }
+
+            // Create new holders for new addrs.
+            if (!curAddrs.containsKey(addr)) {
+                ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr));
+
+                for (int i = 0; i < newAddrs.get(addr); i++)
+                    reinitHolders.add(hld);
+
+                continue;
+            }
+
+            // This holder is up to date.
+            ClientChannelHolder hld = curAddrs.get(addr);
+
+            for (int i = 0; i < newAddrs.get(addr); i++)
+                reinitHolders.add(hld);
+
+            if (hld == currDfltHolder)
+                dfltChannelIdx = reinitHolders.size() - 1;
+        }
+
+        if (dfltChannelIdx == -1)
+            dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+
+        curChannelsGuard.writeLock().lock();

Review comment:
       NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -223,67 +203,78 @@
     ) throws ClientException, ClientError {
         CompletableFuture<T> fut = new CompletableFuture<>();
 
-        ClientChannel ch = channel();
-
-        ch.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) ->
-                handleServiceAsync(op, payloadWriter, payloadReader, fut, null, null, ch, res, err));
+        // Use the only one attempt to avoid blocking async method.
+        handleServiceAsync(fut, op, payloadWriter, payloadReader, 1, null);
 
         return new IgniteClientFutureImpl<>(fut);
     }
 
     /**
      * Handles serviceAsync results and retries as needed.
      */
-    private <T> Object handleServiceAsync(ClientOperation op,
-                                          Consumer<PayloadOutputChannel> payloadWriter,
-                                          Function<PayloadInputChannel, T> payloadReader,
-                                          CompletableFuture<T> fut,
-                                          ClientConnectionException failure,
-                                          AtomicInteger chIdx,
-                                          ClientChannel ch,
-                                          T res,
-                                          Throwable err) {
-        if (err == null) {
-            fut.complete(res);
-            return null;
+    private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+                                        ClientOperation op,
+                                        Consumer<PayloadOutputChannel> payloadWriter,
+                                        Function<PayloadInputChannel, T> payloadReader,
+                                        int attemptsLimit,
+                                        ClientConnectionException failure) {
+        T2<ClientChannel, Integer> chAndAttempts;
+
+        try {
+            chAndAttempts = applyOnDefaultChannel(channel -> channel, attemptsLimit);
+
+        } catch (Throwable ex) {
+            fut.completeExceptionally(ex);
+            return;
         }
 
-        if (err instanceof ClientConnectionException) {
-            onChannelFailure(ch);
+        ClientChannel ch = chAndAttempts.getKey();
 
-            if (failure == null)
-                failure = (ClientConnectionException) err;
-            else
-                failure.addSuppressed(err);
+        ch
+            .serviceAsync(op, payloadWriter, payloadReader)
+            .handle((res, err) -> {
+                if (err == null) {
+                    fut.complete(res);
 
-            if (chIdx == null)
-                chIdx = new AtomicInteger();
+                    return null;
+                }
 
-            while (chIdx.incrementAndGet() < channels.length) {
-                try {
-                    ch = channel();
+                ClientConnectionException failure0 = failure;
+
+                if (err instanceof ClientConnectionException) {
+                    try {
+                        // Will try to reinit channels if topology changed.
+                        onChannelFailure(ch);
+                    }
+                    catch (Throwable ex) {
+                        fut.completeExceptionally(ex);
+                        return null;
+                    }
 
-                    ClientConnectionException failure0 = failure;
-                    AtomicInteger chIdx0 = chIdx;
-                    ClientChannel ch0 = ch;
+                    if (failure0 == null)
+                        failure0 = (ClientConnectionException)err;
+                    else
+                        failure0.addSuppressed(err);
 
-                    ch.serviceAsync(op, payloadWriter, payloadReader).handle((res2, err2) ->
-                            handleServiceAsync(op, payloadWriter, payloadReader, fut, failure0, chIdx0, ch0, res2, err2));
+                    int leftAttempts = attemptsLimit - chAndAttempts.getValue();
 
+                    // If it is a first retry then reset attempts (as for initialization we use only 1 attempt).
+                    if (failure == null)
+                        leftAttempts = getRetryLimit() - 1;
+
+                    if (leftAttempts > 0) {
+                        handleServiceAsync(fut, op, payloadWriter, payloadReader, leftAttempts, failure0);
+                        return null;
+                    }
+                }
+                else {
+                    fut.completeExceptionally(err instanceof ClientException ? err : new ClientException(err));
                     return null;
-                } catch (ClientConnectionException e) {
-                    onChannelFailure(ch);
-                    failure.addSuppressed(e);
                 }
-            }
-        }
 
-        if (failure != null)
-            fut.completeExceptionally(failure);
-        else
-            fut.completeExceptionally(err instanceof ClientException ? err : new ClientException(err));
-
-        return null;
+                fut.completeExceptionally(failure0);

Review comment:
       NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -606,16 +603,271 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /**
+     * Should the channel initialization be stopped.
+     */
+    private boolean shouldStopChannelsReinit() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     * @return {@code true} if holders are reinited and {@code false} if the initialization was interrupted.
+     */
+    synchronized boolean initChannelHolders(boolean force) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (!force && holders != null)
+            return true;
+
+        startChannelsReInit = System.currentTimeMillis();
+
+        // Enable parallel threads to schedule new init of channel holders.
+        scheduledChannelsReinit.set(false);
+
+        Map<InetSocketAddress, Integer> newAddrs = null;
+
+        if (clientCfg.getAddressesFinder() != null) {
+            String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
+
+            if (hostAddrs.length == 0)
+                throw new ClientException("Empty addresses");
+
+            if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+                newAddrs = parsedAddresses(hostAddrs);
+                prevHostAddrs = hostAddrs;
+            }
+        } else if (holders == null)
+            newAddrs = parsedAddresses(clientCfg.getAddresses());
+
+        if (newAddrs == null) {
+            finishChannelsReInit = System.currentTimeMillis();
+            return true;
+        }
+
+        Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
+        Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
+
+        if (holders != null) {
+            for (int i = 0; i < holders.size(); i++) {
+                ClientChannelHolder h = holders.get(i);
+
+                curAddrs.put(h.chCfg.getAddress(), h);
+                allAddrs.add(h.chCfg.getAddress());
+            }
+        }
+
+        List<ClientChannelHolder> reinitHolders = new ArrayList<>();
+
+        // The variable holds a new index of default channel after topology change.
+        // Suppose that reuse of the channel is better than open new connection.
+        int dfltChannelIdx = -1;
+
+        ClientChannelHolder currDfltHolder = null;
+
+        int idx = curChIdx;
+
+        if (idx != -1)
+            currDfltHolder = holders.get(idx);
+
+        for (InetSocketAddress addr : allAddrs) {
+            if (shouldStopChannelsReinit())
+                return false;
+
+            // Obsolete addr, to be removed.
+            if (!newAddrs.containsKey(addr)) {
+                curAddrs.get(addr).close();
+
+                continue;
+            }
+
+            // Create new holders for new addrs.
+            if (!curAddrs.containsKey(addr)) {
+                ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr));
+
+                for (int i = 0; i < newAddrs.get(addr); i++)
+                    reinitHolders.add(hld);
+
+                continue;
+            }
+
+            // This holder is up to date.
+            ClientChannelHolder hld = curAddrs.get(addr);
+
+            for (int i = 0; i < newAddrs.get(addr); i++)
+                reinitHolders.add(hld);
+
+            if (hld == currDfltHolder)
+                dfltChannelIdx = reinitHolders.size() - 1;
+        }
+
+        if (dfltChannelIdx == -1)
+            dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+
+        curChannelsGuard.writeLock().lock();
+        try {
+            channels = reinitHolders;
+            curChIdx = dfltChannelIdx;
+        }
+        finally {
+            curChannelsGuard.writeLock().unlock();
+        }
+
+        finishChannelsReInit = System.currentTimeMillis();
+        return true;
+    }
+
+    /**
+     * Establishing connections to servers. If partition awareness feature is enabled connections are created
+     * for every configured server. Otherwise only default channel is connected.
+     */
+    void channelsInit(boolean force) {
+        if (!force && channels != null)
+            return;
+
+        // Do not establish connections if interrupted.
+        if (!initChannelHolders(force))
+            return;
+
+        // Apply no-op function. Establish default channel connection.
+        applyOnDefaultChannel(channel -> null);
+
+        if (partitionAwarenessEnabled)
+            initAllChannelsAsync();
+    }
+
+    /**
+     * Apply specified {@code function} on a channel corresponding to specified {@code nodeId}.
+     */
+    private <T> T applyOnNodeChannel(UUID nodeId, Function<ClientChannel, T> function) {
+        ClientChannelHolder hld = null;
+        ClientChannel channel = null;
+
+        try {
+            hld = nodeChannels.get(nodeId);
+
+            channel = hld != null ? hld.getOrCreateChannel() : null;
+
+            if (channel != null)
+                return function.apply(channel);
+
+        } catch (ClientConnectionException e) {
+            onChannelFailure(hld, channel);
+        }
+
+        return null;
+    }
+
+    /** */
+    private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (holders == null)
+            throw new ClientException("Connections to nodes aren't initialized.");
+
+        int size = holders.size();
+
+        int attemptsLimit = clientCfg.getRetryLimit() > 0 ?
+            Math.min(clientCfg.getRetryLimit(), size) : size;
+
+        return applyOnDefaultChannel(function, attemptsLimit).getKey();
+    }
+
+    /**
+     * Apply specified {@code function} on any of available channel.
+     */
+    private <T> T2<T, Integer> applyOnDefaultChannel(Function<ClientChannel, T> function, int attemptsLimit) {
+        ClientConnectionException failure = null;
+
+        for (int attempt = 0; attempt < attemptsLimit; attempt++) {
+            ClientChannelHolder hld = null;
+            ClientChannel c = null;
+
+            try {
+                if (closed)
+                    throw new ClientException("Channel is closed");
+
+                curChannelsGuard.readLock().lock();
+
+                try {
+                    hld = channels.get(curChIdx);
+                } finally {
+                    curChannelsGuard.readLock().unlock();
+                }
+
+                c = hld.getOrCreateChannel();
+
+                if (c != null)
+                    return new T2<>(function.apply(c), attempt + 1);
+            }
+            catch (ClientConnectionException e) {
+                if (failure == null)
+                    failure = e;
+                else
+                    failure.addSuppressed(e);
+
+                onChannelFailure(hld, c);
+            }
+        }
+
+        throw failure;
+    }
+
+    /**
+     * Try apply specified {@code function} on a channel corresponding to {@code tryNodeId}.
+     * If failed then apply the function on any available channel.
+     */
+    private <T> T applyOnNodeChannelWithFallback(UUID tryNodeId, Function<ClientChannel, T> function) {
+        ClientChannelHolder hld = nodeChannels.get(tryNodeId);
+
+        int retryLimit = getRetryLimit();
+
+        if (hld != null) {
+            ClientChannel channel = null;
+
+            try {
+                channel = hld.getOrCreateChannel();
+
+                if (channel != null)
+                    return function.apply(channel);
+
+            } catch (ClientConnectionException e) {
+                onChannelFailure(hld, channel);
+
+                retryLimit -= 1;
+
+                if (retryLimit == 0)
+                    throw e;
+            }
+        }
+
+        return applyOnDefaultChannel(function, retryLimit).getKey();
+    }
+
+    /** Get retry limit. */
+    private int getRetryLimit() {
+        int size = channels.size();
+
+        return clientCfg.getRetryLimit() > 0 ? Math.min(clientCfg.getRetryLimit(), size) : size;
+    }
+
     /**
      * Channels holder.
      */
-    private class ClientChannelHolder {
+    class ClientChannelHolder {
         /** Channel configuration. */
         private final ClientChannelConfiguration chCfg;
 
         /** Channel. */
         private volatile ClientChannel ch;
 
+        /** ID of the last server node that {@link ch} is or was connected to. */
+        private volatile UUID serverNodeId;
+
+        /** Address that holder is bind to (chCfg.addr) is not in use now. So close the holder */

Review comment:
       point at the end

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -561,5 +746,24 @@ private synchronized void closeChannel() {
                 ch = null;
             }
         }
+
+        /** Close holder. */
+        void close() {
+            close = true;
+            if (serverNodeId != null)
+                nodeChannels.remove(serverNodeId);

Review comment:
       Still not fixed

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -606,16 +603,271 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /**
+     * Should the channel initialization be stopped.
+     */
+    private boolean shouldStopChannelsReinit() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     * @return {@code true} if holders are reinited and {@code false} if the initialization was interrupted.
+     */
+    synchronized boolean initChannelHolders(boolean force) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (!force && holders != null)
+            return true;
+
+        startChannelsReInit = System.currentTimeMillis();
+
+        // Enable parallel threads to schedule new init of channel holders.
+        scheduledChannelsReinit.set(false);
+
+        Map<InetSocketAddress, Integer> newAddrs = null;
+
+        if (clientCfg.getAddressesFinder() != null) {
+            String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
+
+            if (hostAddrs.length == 0)
+                throw new ClientException("Empty addresses");
+
+            if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+                newAddrs = parsedAddresses(hostAddrs);
+                prevHostAddrs = hostAddrs;
+            }
+        } else if (holders == null)
+            newAddrs = parsedAddresses(clientCfg.getAddresses());
+
+        if (newAddrs == null) {
+            finishChannelsReInit = System.currentTimeMillis();
+            return true;
+        }
+
+        Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
+        Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
+
+        if (holders != null) {
+            for (int i = 0; i < holders.size(); i++) {
+                ClientChannelHolder h = holders.get(i);
+
+                curAddrs.put(h.chCfg.getAddress(), h);
+                allAddrs.add(h.chCfg.getAddress());
+            }
+        }
+
+        List<ClientChannelHolder> reinitHolders = new ArrayList<>();
+
+        // The variable holds a new index of default channel after topology change.
+        // Suppose that reuse of the channel is better than open new connection.
+        int dfltChannelIdx = -1;
+
+        ClientChannelHolder currDfltHolder = null;
+
+        int idx = curChIdx;
+
+        if (idx != -1)
+            currDfltHolder = holders.get(idx);
+
+        for (InetSocketAddress addr : allAddrs) {
+            if (shouldStopChannelsReinit())
+                return false;
+
+            // Obsolete addr, to be removed.
+            if (!newAddrs.containsKey(addr)) {
+                curAddrs.get(addr).close();
+
+                continue;
+            }
+
+            // Create new holders for new addrs.
+            if (!curAddrs.containsKey(addr)) {
+                ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr));
+
+                for (int i = 0; i < newAddrs.get(addr); i++)
+                    reinitHolders.add(hld);
+
+                continue;
+            }
+
+            // This holder is up to date.
+            ClientChannelHolder hld = curAddrs.get(addr);
+
+            for (int i = 0; i < newAddrs.get(addr); i++)
+                reinitHolders.add(hld);
+
+            if (hld == currDfltHolder)
+                dfltChannelIdx = reinitHolders.size() - 1;
+        }
+
+        if (dfltChannelIdx == -1)
+            dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+
+        curChannelsGuard.writeLock().lock();
+        try {
+            channels = reinitHolders;
+            curChIdx = dfltChannelIdx;
+        }
+        finally {
+            curChannelsGuard.writeLock().unlock();
+        }
+
+        finishChannelsReInit = System.currentTimeMillis();
+        return true;
+    }
+
+    /**
+     * Establishing connections to servers. If partition awareness feature is enabled connections are created
+     * for every configured server. Otherwise only default channel is connected.
+     */
+    void channelsInit(boolean force) {
+        if (!force && channels != null)
+            return;
+
+        // Do not establish connections if interrupted.
+        if (!initChannelHolders(force))
+            return;
+
+        // Apply no-op function. Establish default channel connection.
+        applyOnDefaultChannel(channel -> null);
+
+        if (partitionAwarenessEnabled)
+            initAllChannelsAsync();
+    }
+
+    /**
+     * Apply specified {@code function} on a channel corresponding to specified {@code nodeId}.
+     */
+    private <T> T applyOnNodeChannel(UUID nodeId, Function<ClientChannel, T> function) {
+        ClientChannelHolder hld = null;
+        ClientChannel channel = null;
+
+        try {
+            hld = nodeChannels.get(nodeId);
+
+            channel = hld != null ? hld.getOrCreateChannel() : null;
+
+            if (channel != null)
+                return function.apply(channel);
+
+        } catch (ClientConnectionException e) {
+            onChannelFailure(hld, channel);
+        }
+
+        return null;
+    }
+
+    /** */
+    private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (holders == null)
+            throw new ClientException("Connections to nodes aren't initialized.");
+
+        int size = holders.size();
+
+        int attemptsLimit = clientCfg.getRetryLimit() > 0 ?
+            Math.min(clientCfg.getRetryLimit(), size) : size;

Review comment:
       Can be replaced with `getRetryLimit()`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -606,16 +603,271 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /**
+     * Should the channel initialization be stopped.
+     */
+    private boolean shouldStopChannelsReinit() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     * @return {@code true} if holders are reinited and {@code false} if the initialization was interrupted.
+     */
+    synchronized boolean initChannelHolders(boolean force) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (!force && holders != null)
+            return true;
+
+        startChannelsReInit = System.currentTimeMillis();
+
+        // Enable parallel threads to schedule new init of channel holders.
+        scheduledChannelsReinit.set(false);
+
+        Map<InetSocketAddress, Integer> newAddrs = null;
+
+        if (clientCfg.getAddressesFinder() != null) {
+            String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
+
+            if (hostAddrs.length == 0)
+                throw new ClientException("Empty addresses");
+
+            if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+                newAddrs = parsedAddresses(hostAddrs);
+                prevHostAddrs = hostAddrs;
+            }
+        } else if (holders == null)
+            newAddrs = parsedAddresses(clientCfg.getAddresses());
+
+        if (newAddrs == null) {
+            finishChannelsReInit = System.currentTimeMillis();

Review comment:
       NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -223,67 +203,78 @@
     ) throws ClientException, ClientError {
         CompletableFuture<T> fut = new CompletableFuture<>();
 
-        ClientChannel ch = channel();
-
-        ch.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) ->
-                handleServiceAsync(op, payloadWriter, payloadReader, fut, null, null, ch, res, err));
+        // Use the only one attempt to avoid blocking async method.
+        handleServiceAsync(fut, op, payloadWriter, payloadReader, 1, null);
 
         return new IgniteClientFutureImpl<>(fut);
     }
 
     /**
      * Handles serviceAsync results and retries as needed.
      */
-    private <T> Object handleServiceAsync(ClientOperation op,
-                                          Consumer<PayloadOutputChannel> payloadWriter,
-                                          Function<PayloadInputChannel, T> payloadReader,
-                                          CompletableFuture<T> fut,
-                                          ClientConnectionException failure,
-                                          AtomicInteger chIdx,
-                                          ClientChannel ch,
-                                          T res,
-                                          Throwable err) {
-        if (err == null) {
-            fut.complete(res);
-            return null;
+    private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+                                        ClientOperation op,
+                                        Consumer<PayloadOutputChannel> payloadWriter,
+                                        Function<PayloadInputChannel, T> payloadReader,
+                                        int attemptsLimit,
+                                        ClientConnectionException failure) {
+        T2<ClientChannel, Integer> chAndAttempts;
+
+        try {
+            chAndAttempts = applyOnDefaultChannel(channel -> channel, attemptsLimit);
+
+        } catch (Throwable ex) {
+            fut.completeExceptionally(ex);
+            return;
         }
 
-        if (err instanceof ClientConnectionException) {
-            onChannelFailure(ch);
+        ClientChannel ch = chAndAttempts.getKey();
 
-            if (failure == null)
-                failure = (ClientConnectionException) err;
-            else
-                failure.addSuppressed(err);
+        ch
+            .serviceAsync(op, payloadWriter, payloadReader)
+            .handle((res, err) -> {
+                if (err == null) {
+                    fut.complete(res);

Review comment:
       NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -223,67 +203,78 @@
     ) throws ClientException, ClientError {
         CompletableFuture<T> fut = new CompletableFuture<>();
 
-        ClientChannel ch = channel();
-
-        ch.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) ->
-                handleServiceAsync(op, payloadWriter, payloadReader, fut, null, null, ch, res, err));
+        // Use the only one attempt to avoid blocking async method.
+        handleServiceAsync(fut, op, payloadWriter, payloadReader, 1, null);
 
         return new IgniteClientFutureImpl<>(fut);
     }
 
     /**
      * Handles serviceAsync results and retries as needed.
      */
-    private <T> Object handleServiceAsync(ClientOperation op,
-                                          Consumer<PayloadOutputChannel> payloadWriter,
-                                          Function<PayloadInputChannel, T> payloadReader,
-                                          CompletableFuture<T> fut,
-                                          ClientConnectionException failure,
-                                          AtomicInteger chIdx,
-                                          ClientChannel ch,
-                                          T res,
-                                          Throwable err) {
-        if (err == null) {
-            fut.complete(res);
-            return null;
+    private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+                                        ClientOperation op,
+                                        Consumer<PayloadOutputChannel> payloadWriter,
+                                        Function<PayloadInputChannel, T> payloadReader,
+                                        int attemptsLimit,
+                                        ClientConnectionException failure) {
+        T2<ClientChannel, Integer> chAndAttempts;
+
+        try {
+            chAndAttempts = applyOnDefaultChannel(channel -> channel, attemptsLimit);
+
+        } catch (Throwable ex) {
+            fut.completeExceptionally(ex);
+            return;
         }
 
-        if (err instanceof ClientConnectionException) {
-            onChannelFailure(ch);
+        ClientChannel ch = chAndAttempts.getKey();
 
-            if (failure == null)
-                failure = (ClientConnectionException) err;
-            else
-                failure.addSuppressed(err);
+        ch
+            .serviceAsync(op, payloadWriter, payloadReader)
+            .handle((res, err) -> {
+                if (err == null) {
+                    fut.complete(res);
 
-            if (chIdx == null)
-                chIdx = new AtomicInteger();
+                    return null;
+                }
 
-            while (chIdx.incrementAndGet() < channels.length) {
-                try {
-                    ch = channel();
+                ClientConnectionException failure0 = failure;
+
+                if (err instanceof ClientConnectionException) {
+                    try {
+                        // Will try to reinit channels if topology changed.
+                        onChannelFailure(ch);
+                    }
+                    catch (Throwable ex) {
+                        fut.completeExceptionally(ex);

Review comment:
       NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -223,67 +203,78 @@
     ) throws ClientException, ClientError {
         CompletableFuture<T> fut = new CompletableFuture<>();
 
-        ClientChannel ch = channel();
-
-        ch.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) ->
-                handleServiceAsync(op, payloadWriter, payloadReader, fut, null, null, ch, res, err));
+        // Use the only one attempt to avoid blocking async method.
+        handleServiceAsync(fut, op, payloadWriter, payloadReader, 1, null);
 
         return new IgniteClientFutureImpl<>(fut);
     }
 
     /**
      * Handles serviceAsync results and retries as needed.
      */
-    private <T> Object handleServiceAsync(ClientOperation op,
-                                          Consumer<PayloadOutputChannel> payloadWriter,
-                                          Function<PayloadInputChannel, T> payloadReader,
-                                          CompletableFuture<T> fut,
-                                          ClientConnectionException failure,
-                                          AtomicInteger chIdx,
-                                          ClientChannel ch,
-                                          T res,
-                                          Throwable err) {
-        if (err == null) {
-            fut.complete(res);
-            return null;
+    private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+                                        ClientOperation op,
+                                        Consumer<PayloadOutputChannel> payloadWriter,
+                                        Function<PayloadInputChannel, T> payloadReader,
+                                        int attemptsLimit,
+                                        ClientConnectionException failure) {
+        T2<ClientChannel, Integer> chAndAttempts;
+
+        try {
+            chAndAttempts = applyOnDefaultChannel(channel -> channel, attemptsLimit);
+
+        } catch (Throwable ex) {
+            fut.completeExceptionally(ex);
+            return;
         }
 
-        if (err instanceof ClientConnectionException) {
-            onChannelFailure(ch);
+        ClientChannel ch = chAndAttempts.getKey();
 
-            if (failure == null)
-                failure = (ClientConnectionException) err;
-            else
-                failure.addSuppressed(err);
+        ch
+            .serviceAsync(op, payloadWriter, payloadReader)
+            .handle((res, err) -> {
+                if (err == null) {
+                    fut.complete(res);
 
-            if (chIdx == null)
-                chIdx = new AtomicInteger();
+                    return null;
+                }
 
-            while (chIdx.incrementAndGet() < channels.length) {
-                try {
-                    ch = channel();
+                ClientConnectionException failure0 = failure;
+
+                if (err instanceof ClientConnectionException) {
+                    try {
+                        // Will try to reinit channels if topology changed.
+                        onChannelFailure(ch);
+                    }
+                    catch (Throwable ex) {
+                        fut.completeExceptionally(ex);
+                        return null;
+                    }
 
-                    ClientConnectionException failure0 = failure;
-                    AtomicInteger chIdx0 = chIdx;
-                    ClientChannel ch0 = ch;
+                    if (failure0 == null)
+                        failure0 = (ClientConnectionException)err;
+                    else
+                        failure0.addSuppressed(err);
 
-                    ch.serviceAsync(op, payloadWriter, payloadReader).handle((res2, err2) ->
-                            handleServiceAsync(op, payloadWriter, payloadReader, fut, failure0, chIdx0, ch0, res2, err2));
+                    int leftAttempts = attemptsLimit - chAndAttempts.getValue();
 
+                    // If it is a first retry then reset attempts (as for initialization we use only 1 attempt).
+                    if (failure == null)
+                        leftAttempts = getRetryLimit() - 1;
+
+                    if (leftAttempts > 0) {
+                        handleServiceAsync(fut, op, payloadWriter, payloadReader, leftAttempts, failure0);
+                        return null;
+                    }
+                }
+                else {
+                    fut.completeExceptionally(err instanceof ClientException ? err : new ClientException(err));

Review comment:
       NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -223,67 +203,78 @@
     ) throws ClientException, ClientError {
         CompletableFuture<T> fut = new CompletableFuture<>();
 
-        ClientChannel ch = channel();
-
-        ch.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) ->
-                handleServiceAsync(op, payloadWriter, payloadReader, fut, null, null, ch, res, err));
+        // Use the only one attempt to avoid blocking async method.
+        handleServiceAsync(fut, op, payloadWriter, payloadReader, 1, null);
 
         return new IgniteClientFutureImpl<>(fut);
     }
 
     /**
      * Handles serviceAsync results and retries as needed.
      */
-    private <T> Object handleServiceAsync(ClientOperation op,
-                                          Consumer<PayloadOutputChannel> payloadWriter,
-                                          Function<PayloadInputChannel, T> payloadReader,
-                                          CompletableFuture<T> fut,
-                                          ClientConnectionException failure,
-                                          AtomicInteger chIdx,
-                                          ClientChannel ch,
-                                          T res,
-                                          Throwable err) {
-        if (err == null) {
-            fut.complete(res);
-            return null;
+    private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+                                        ClientOperation op,
+                                        Consumer<PayloadOutputChannel> payloadWriter,
+                                        Function<PayloadInputChannel, T> payloadReader,
+                                        int attemptsLimit,
+                                        ClientConnectionException failure) {
+        T2<ClientChannel, Integer> chAndAttempts;
+
+        try {
+            chAndAttempts = applyOnDefaultChannel(channel -> channel, attemptsLimit);
+
+        } catch (Throwable ex) {
+            fut.completeExceptionally(ex);

Review comment:
       What if `failure` already not null here? We will lose this information?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -606,16 +603,271 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /**
+     * Should the channel initialization be stopped.
+     */
+    private boolean shouldStopChannelsReinit() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     * @return {@code true} if holders are reinited and {@code false} if the initialization was interrupted.
+     */
+    synchronized boolean initChannelHolders(boolean force) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (!force && holders != null)
+            return true;
+
+        startChannelsReInit = System.currentTimeMillis();
+
+        // Enable parallel threads to schedule new init of channel holders.
+        scheduledChannelsReinit.set(false);
+
+        Map<InetSocketAddress, Integer> newAddrs = null;
+
+        if (clientCfg.getAddressesFinder() != null) {
+            String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
+
+            if (hostAddrs.length == 0)
+                throw new ClientException("Empty addresses");
+
+            if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+                newAddrs = parsedAddresses(hostAddrs);
+                prevHostAddrs = hostAddrs;
+            }
+        } else if (holders == null)
+            newAddrs = parsedAddresses(clientCfg.getAddresses());
+
+        if (newAddrs == null) {
+            finishChannelsReInit = System.currentTimeMillis();
+            return true;
+        }
+
+        Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
+        Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
+
+        if (holders != null) {
+            for (int i = 0; i < holders.size(); i++) {
+                ClientChannelHolder h = holders.get(i);
+
+                curAddrs.put(h.chCfg.getAddress(), h);
+                allAddrs.add(h.chCfg.getAddress());
+            }
+        }
+
+        List<ClientChannelHolder> reinitHolders = new ArrayList<>();
+
+        // The variable holds a new index of default channel after topology change.
+        // Suppose that reuse of the channel is better than open new connection.
+        int dfltChannelIdx = -1;
+
+        ClientChannelHolder currDfltHolder = null;
+
+        int idx = curChIdx;
+
+        if (idx != -1)
+            currDfltHolder = holders.get(idx);
+
+        for (InetSocketAddress addr : allAddrs) {
+            if (shouldStopChannelsReinit())
+                return false;
+
+            // Obsolete addr, to be removed.
+            if (!newAddrs.containsKey(addr)) {
+                curAddrs.get(addr).close();
+
+                continue;
+            }
+
+            // Create new holders for new addrs.
+            if (!curAddrs.containsKey(addr)) {
+                ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr));
+
+                for (int i = 0; i < newAddrs.get(addr); i++)
+                    reinitHolders.add(hld);
+
+                continue;
+            }
+
+            // This holder is up to date.
+            ClientChannelHolder hld = curAddrs.get(addr);
+
+            for (int i = 0; i < newAddrs.get(addr); i++)
+                reinitHolders.add(hld);
+
+            if (hld == currDfltHolder)
+                dfltChannelIdx = reinitHolders.size() - 1;
+        }
+
+        if (dfltChannelIdx == -1)
+            dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+
+        curChannelsGuard.writeLock().lock();
+        try {
+            channels = reinitHolders;
+            curChIdx = dfltChannelIdx;
+        }
+        finally {
+            curChannelsGuard.writeLock().unlock();
+        }
+
+        finishChannelsReInit = System.currentTimeMillis();

Review comment:
       NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -223,67 +203,78 @@
     ) throws ClientException, ClientError {
         CompletableFuture<T> fut = new CompletableFuture<>();
 
-        ClientChannel ch = channel();
-
-        ch.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) ->
-                handleServiceAsync(op, payloadWriter, payloadReader, fut, null, null, ch, res, err));
+        // Use the only one attempt to avoid blocking async method.
+        handleServiceAsync(fut, op, payloadWriter, payloadReader, 1, null);
 
         return new IgniteClientFutureImpl<>(fut);
     }
 
     /**
      * Handles serviceAsync results and retries as needed.
      */
-    private <T> Object handleServiceAsync(ClientOperation op,
-                                          Consumer<PayloadOutputChannel> payloadWriter,
-                                          Function<PayloadInputChannel, T> payloadReader,
-                                          CompletableFuture<T> fut,
-                                          ClientConnectionException failure,
-                                          AtomicInteger chIdx,
-                                          ClientChannel ch,
-                                          T res,
-                                          Throwable err) {
-        if (err == null) {
-            fut.complete(res);
-            return null;
+    private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+                                        ClientOperation op,
+                                        Consumer<PayloadOutputChannel> payloadWriter,
+                                        Function<PayloadInputChannel, T> payloadReader,
+                                        int attemptsLimit,
+                                        ClientConnectionException failure) {
+        T2<ClientChannel, Integer> chAndAttempts;
+
+        try {
+            chAndAttempts = applyOnDefaultChannel(channel -> channel, attemptsLimit);

Review comment:
       I think it's better to control attempts count in one method. I propose to rewrite this method without using `applyOnDefaultChannel` and rewrite `applyOnDefaultChannel` to return only result but not `T2` with attempts counter (help GC a little bit).
   WDYT?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -606,16 +603,271 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /**
+     * Should the channel initialization be stopped.
+     */
+    private boolean shouldStopChannelsReinit() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     * @return {@code true} if holders are reinited and {@code false} if the initialization was interrupted.
+     */
+    synchronized boolean initChannelHolders(boolean force) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (!force && holders != null)
+            return true;
+
+        startChannelsReInit = System.currentTimeMillis();
+
+        // Enable parallel threads to schedule new init of channel holders.
+        scheduledChannelsReinit.set(false);
+
+        Map<InetSocketAddress, Integer> newAddrs = null;
+
+        if (clientCfg.getAddressesFinder() != null) {
+            String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
+
+            if (hostAddrs.length == 0)
+                throw new ClientException("Empty addresses");
+
+            if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+                newAddrs = parsedAddresses(hostAddrs);
+                prevHostAddrs = hostAddrs;
+            }
+        } else if (holders == null)
+            newAddrs = parsedAddresses(clientCfg.getAddresses());
+
+        if (newAddrs == null) {
+            finishChannelsReInit = System.currentTimeMillis();
+            return true;
+        }
+
+        Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
+        Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
+
+        if (holders != null) {
+            for (int i = 0; i < holders.size(); i++) {
+                ClientChannelHolder h = holders.get(i);
+
+                curAddrs.put(h.chCfg.getAddress(), h);
+                allAddrs.add(h.chCfg.getAddress());
+            }
+        }
+
+        List<ClientChannelHolder> reinitHolders = new ArrayList<>();
+
+        // The variable holds a new index of default channel after topology change.
+        // Suppose that reuse of the channel is better than open new connection.
+        int dfltChannelIdx = -1;
+
+        ClientChannelHolder currDfltHolder = null;
+
+        int idx = curChIdx;
+
+        if (idx != -1)
+            currDfltHolder = holders.get(idx);
+
+        for (InetSocketAddress addr : allAddrs) {
+            if (shouldStopChannelsReinit())
+                return false;
+
+            // Obsolete addr, to be removed.
+            if (!newAddrs.containsKey(addr)) {
+                curAddrs.get(addr).close();
+
+                continue;
+            }
+
+            // Create new holders for new addrs.
+            if (!curAddrs.containsKey(addr)) {
+                ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr));
+
+                for (int i = 0; i < newAddrs.get(addr); i++)
+                    reinitHolders.add(hld);
+
+                continue;
+            }
+
+            // This holder is up to date.
+            ClientChannelHolder hld = curAddrs.get(addr);
+
+            for (int i = 0; i < newAddrs.get(addr); i++)
+                reinitHolders.add(hld);
+
+            if (hld == currDfltHolder)
+                dfltChannelIdx = reinitHolders.size() - 1;
+        }
+
+        if (dfltChannelIdx == -1)
+            dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+
+        curChannelsGuard.writeLock().lock();
+        try {
+            channels = reinitHolders;
+            curChIdx = dfltChannelIdx;
+        }
+        finally {
+            curChannelsGuard.writeLock().unlock();
+        }
+
+        finishChannelsReInit = System.currentTimeMillis();
+        return true;
+    }
+
+    /**
+     * Establishing connections to servers. If partition awareness feature is enabled connections are created
+     * for every configured server. Otherwise only default channel is connected.
+     */
+    void channelsInit(boolean force) {
+        if (!force && channels != null)
+            return;
+
+        // Do not establish connections if interrupted.
+        if (!initChannelHolders(force))
+            return;
+
+        // Apply no-op function. Establish default channel connection.
+        applyOnDefaultChannel(channel -> null);
+
+        if (partitionAwarenessEnabled)
+            initAllChannelsAsync();
+    }
+
+    /**
+     * Apply specified {@code function} on a channel corresponding to specified {@code nodeId}.
+     */
+    private <T> T applyOnNodeChannel(UUID nodeId, Function<ClientChannel, T> function) {
+        ClientChannelHolder hld = null;
+        ClientChannel channel = null;
+
+        try {
+            hld = nodeChannels.get(nodeId);
+
+            channel = hld != null ? hld.getOrCreateChannel() : null;
+
+            if (channel != null)
+                return function.apply(channel);
+

Review comment:
       Redundant NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -606,16 +603,271 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /**
+     * Should the channel initialization be stopped.
+     */
+    private boolean shouldStopChannelsReinit() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     * @return {@code true} if holders are reinited and {@code false} if the initialization was interrupted.
+     */
+    synchronized boolean initChannelHolders(boolean force) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (!force && holders != null)
+            return true;
+
+        startChannelsReInit = System.currentTimeMillis();
+
+        // Enable parallel threads to schedule new init of channel holders.
+        scheduledChannelsReinit.set(false);
+
+        Map<InetSocketAddress, Integer> newAddrs = null;
+
+        if (clientCfg.getAddressesFinder() != null) {
+            String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
+
+            if (hostAddrs.length == 0)
+                throw new ClientException("Empty addresses");
+
+            if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+                newAddrs = parsedAddresses(hostAddrs);
+                prevHostAddrs = hostAddrs;
+            }
+        } else if (holders == null)
+            newAddrs = parsedAddresses(clientCfg.getAddresses());
+
+        if (newAddrs == null) {
+            finishChannelsReInit = System.currentTimeMillis();
+            return true;
+        }
+
+        Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
+        Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
+
+        if (holders != null) {
+            for (int i = 0; i < holders.size(); i++) {
+                ClientChannelHolder h = holders.get(i);
+
+                curAddrs.put(h.chCfg.getAddress(), h);
+                allAddrs.add(h.chCfg.getAddress());
+            }
+        }
+
+        List<ClientChannelHolder> reinitHolders = new ArrayList<>();
+
+        // The variable holds a new index of default channel after topology change.
+        // Suppose that reuse of the channel is better than open new connection.
+        int dfltChannelIdx = -1;
+
+        ClientChannelHolder currDfltHolder = null;
+
+        int idx = curChIdx;
+
+        if (idx != -1)
+            currDfltHolder = holders.get(idx);
+
+        for (InetSocketAddress addr : allAddrs) {
+            if (shouldStopChannelsReinit())
+                return false;
+
+            // Obsolete addr, to be removed.
+            if (!newAddrs.containsKey(addr)) {
+                curAddrs.get(addr).close();
+
+                continue;
+            }
+
+            // Create new holders for new addrs.
+            if (!curAddrs.containsKey(addr)) {
+                ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr));
+
+                for (int i = 0; i < newAddrs.get(addr); i++)
+                    reinitHolders.add(hld);
+
+                continue;
+            }
+
+            // This holder is up to date.
+            ClientChannelHolder hld = curAddrs.get(addr);
+
+            for (int i = 0; i < newAddrs.get(addr); i++)
+                reinitHolders.add(hld);
+
+            if (hld == currDfltHolder)
+                dfltChannelIdx = reinitHolders.size() - 1;
+        }
+
+        if (dfltChannelIdx == -1)
+            dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+
+        curChannelsGuard.writeLock().lock();
+        try {
+            channels = reinitHolders;
+            curChIdx = dfltChannelIdx;
+        }
+        finally {
+            curChannelsGuard.writeLock().unlock();
+        }
+
+        finishChannelsReInit = System.currentTimeMillis();
+        return true;
+    }
+
+    /**
+     * Establishing connections to servers. If partition awareness feature is enabled connections are created
+     * for every configured server. Otherwise only default channel is connected.
+     */
+    void channelsInit(boolean force) {
+        if (!force && channels != null)

Review comment:
       Currently, flag `force` is useless (at least for production code, but perhaps for test code too). There are 3 calls of this method from production code, first - right after constructor (when `channels == null`), two other calls with `force == true`.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -512,80 +504,81 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
         return ranges.stream()
             .flatMap(r -> IntStream
                 .rangeClosed(r.portFrom(), r.portTo()).boxed()
-                .map(p -> new InetSocketAddress(r.host(), p))
+                .map(p -> InetSocketAddress.createUnresolved(r.host(), p))
             )
-            .collect(Collectors.toList());
+            .collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
     }
 
-    /** */
-    private synchronized ClientChannel channel() {
-        if (closed)
-            throw new ClientException("Channel is closed");
-
+    /**
+     * Roll current default channel if specified holder equals to it.
+     */
+    private void rollCurrentChannel(ClientChannelHolder hld) {
+        curChannelsGuard.writeLock().lock();

Review comment:
       NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -512,80 +504,81 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
         return ranges.stream()
             .flatMap(r -> IntStream
                 .rangeClosed(r.portFrom(), r.portTo()).boxed()
-                .map(p -> new InetSocketAddress(r.host(), p))
+                .map(p -> InetSocketAddress.createUnresolved(r.host(), p))
             )
-            .collect(Collectors.toList());
+            .collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
     }
 
-    /** */
-    private synchronized ClientChannel channel() {
-        if (closed)
-            throw new ClientException("Channel is closed");
-
+    /**
+     * Roll current default channel if specified holder equals to it.
+     */
+    private void rollCurrentChannel(ClientChannelHolder hld) {
+        curChannelsGuard.writeLock().lock();
         try {
-            return channels[curChIdx].getOrCreateChannel();
-        }
-        catch (ClientConnectionException e) {
-            rollCurrentChannel();
+            int idx = curChIdx;
+            List<ClientChannelHolder> holders = channels;
 
-            throw e;
-        }
-    }
+            ClientChannelHolder dfltHld = holders.get(idx);
 
-    /** */
-    private synchronized void rollCurrentChannel() {
-        if (++curChIdx >= channels.length)
-            curChIdx = 0;
+            if (dfltHld == hld) {
+                idx += 1;
+
+                if (idx >= holders.size())
+                    curChIdx = 0;
+                else
+                    curChIdx = idx;
+            }
+        } finally {
+            curChannelsGuard.writeLock().unlock();
+        }
     }
 
     /**
      * On current channel failure.
      */
-    private synchronized void onChannelFailure(ClientChannel ch) {
+    private void onChannelFailure(ClientChannel ch) {
         // There is nothing wrong if curChIdx was concurrently changed, since channel was closed by another thread
         // when current index was changed and no other wrong channel will be closed by current thread because
         // onChannelFailure checks channel binded to the holder before closing it.
-        onChannelFailure(channels[curChIdx], ch);
-
-        chFailLsnrs.forEach(Runnable::run);
+        onChannelFailure(channels.get(curChIdx), ch);
     }
 
     /**
      * On channel of the specified holder failure.
      */
-    private synchronized void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
-        if (ch == hld.ch && ch != null) {
+    private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
+        if (ch != null && ch == hld.ch)
             hld.closeChannel();
 
-            if (hld == channels[curChIdx])
-                rollCurrentChannel();
-        }
+        chFailLsnrs.forEach(Runnable::run);
+
+        if (scheduledChannelsReinit.get())
+            channelsInit(true);
+        else
+            rollCurrentChannel(hld);

Review comment:
       Why do we skip rollCurrentChannel if scheduledChannelsReinit? I think we should roll channel in any case.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -512,80 +504,81 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
         return ranges.stream()
             .flatMap(r -> IntStream
                 .rangeClosed(r.portFrom(), r.portTo()).boxed()
-                .map(p -> new InetSocketAddress(r.host(), p))
+                .map(p -> InetSocketAddress.createUnresolved(r.host(), p))
             )
-            .collect(Collectors.toList());
+            .collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
     }
 
-    /** */
-    private synchronized ClientChannel channel() {
-        if (closed)
-            throw new ClientException("Channel is closed");
-
+    /**
+     * Roll current default channel if specified holder equals to it.
+     */
+    private void rollCurrentChannel(ClientChannelHolder hld) {
+        curChannelsGuard.writeLock().lock();
         try {
-            return channels[curChIdx].getOrCreateChannel();
-        }
-        catch (ClientConnectionException e) {
-            rollCurrentChannel();
+            int idx = curChIdx;
+            List<ClientChannelHolder> holders = channels;
 
-            throw e;
-        }
-    }
+            ClientChannelHolder dfltHld = holders.get(idx);
 
-    /** */
-    private synchronized void rollCurrentChannel() {
-        if (++curChIdx >= channels.length)
-            curChIdx = 0;
+            if (dfltHld == hld) {
+                idx += 1;
+
+                if (idx >= holders.size())
+                    curChIdx = 0;
+                else
+                    curChIdx = idx;
+            }
+        } finally {
+            curChannelsGuard.writeLock().unlock();
+        }
     }
 
     /**
      * On current channel failure.
      */
-    private synchronized void onChannelFailure(ClientChannel ch) {
+    private void onChannelFailure(ClientChannel ch) {
         // There is nothing wrong if curChIdx was concurrently changed, since channel was closed by another thread
         // when current index was changed and no other wrong channel will be closed by current thread because
         // onChannelFailure checks channel binded to the holder before closing it.
-        onChannelFailure(channels[curChIdx], ch);
-
-        chFailLsnrs.forEach(Runnable::run);
+        onChannelFailure(channels.get(curChIdx), ch);
     }
 
     /**
      * On channel of the specified holder failure.
      */
-    private synchronized void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
-        if (ch == hld.ch && ch != null) {
+    private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
+        if (ch != null && ch == hld.ch)
             hld.closeChannel();
 
-            if (hld == channels[curChIdx])
-                rollCurrentChannel();
-        }
+        chFailLsnrs.forEach(Runnable::run);
+
+        if (scheduledChannelsReinit.get())

Review comment:
       Perhaps `&& !partitionAwarenessEnabled` should be added here (to avoid double reinit for partition awareness, first time here and second time in async thread). WDYT?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -223,67 +203,78 @@
     ) throws ClientException, ClientError {
         CompletableFuture<T> fut = new CompletableFuture<>();
 
-        ClientChannel ch = channel();
-
-        ch.serviceAsync(op, payloadWriter, payloadReader).handle((res, err) ->
-                handleServiceAsync(op, payloadWriter, payloadReader, fut, null, null, ch, res, err));
+        // Use the only one attempt to avoid blocking async method.
+        handleServiceAsync(fut, op, payloadWriter, payloadReader, 1, null);
 
         return new IgniteClientFutureImpl<>(fut);
     }
 
     /**
      * Handles serviceAsync results and retries as needed.
      */
-    private <T> Object handleServiceAsync(ClientOperation op,
-                                          Consumer<PayloadOutputChannel> payloadWriter,
-                                          Function<PayloadInputChannel, T> payloadReader,
-                                          CompletableFuture<T> fut,
-                                          ClientConnectionException failure,
-                                          AtomicInteger chIdx,
-                                          ClientChannel ch,
-                                          T res,
-                                          Throwable err) {
-        if (err == null) {
-            fut.complete(res);
-            return null;
+    private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+                                        ClientOperation op,
+                                        Consumer<PayloadOutputChannel> payloadWriter,
+                                        Function<PayloadInputChannel, T> payloadReader,
+                                        int attemptsLimit,
+                                        ClientConnectionException failure) {
+        T2<ClientChannel, Integer> chAndAttempts;
+
+        try {
+            chAndAttempts = applyOnDefaultChannel(channel -> channel, attemptsLimit);
+
+        } catch (Throwable ex) {
+            fut.completeExceptionally(ex);
+            return;
         }
 
-        if (err instanceof ClientConnectionException) {
-            onChannelFailure(ch);
+        ClientChannel ch = chAndAttempts.getKey();
 
-            if (failure == null)
-                failure = (ClientConnectionException) err;
-            else
-                failure.addSuppressed(err);
+        ch
+            .serviceAsync(op, payloadWriter, payloadReader)
+            .handle((res, err) -> {
+                if (err == null) {
+                    fut.complete(res);
 
-            if (chIdx == null)
-                chIdx = new AtomicInteger();
+                    return null;
+                }
 
-            while (chIdx.incrementAndGet() < channels.length) {
-                try {
-                    ch = channel();
+                ClientConnectionException failure0 = failure;
+
+                if (err instanceof ClientConnectionException) {
+                    try {
+                        // Will try to reinit channels if topology changed.
+                        onChannelFailure(ch);
+                    }
+                    catch (Throwable ex) {
+                        fut.completeExceptionally(ex);
+                        return null;
+                    }
 
-                    ClientConnectionException failure0 = failure;
-                    AtomicInteger chIdx0 = chIdx;
-                    ClientChannel ch0 = ch;
+                    if (failure0 == null)
+                        failure0 = (ClientConnectionException)err;
+                    else
+                        failure0.addSuppressed(err);
 
-                    ch.serviceAsync(op, payloadWriter, payloadReader).handle((res2, err2) ->
-                            handleServiceAsync(op, payloadWriter, payloadReader, fut, failure0, chIdx0, ch0, res2, err2));
+                    int leftAttempts = attemptsLimit - chAndAttempts.getValue();
 
+                    // If it is a first retry then reset attempts (as for initialization we use only 1 attempt).
+                    if (failure == null)
+                        leftAttempts = getRetryLimit() - 1;
+
+                    if (leftAttempts > 0) {
+                        handleServiceAsync(fut, op, payloadWriter, payloadReader, leftAttempts, failure0);

Review comment:
       NL




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org