You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/11/08 13:11:07 UTC

[ignite] branch ignite-2.12 updated: IGNITE-15807 Java thin: follow user-defined endpoint order, try default port first (#9522)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch ignite-2.12
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-2.12 by this push:
     new 92e56ea  IGNITE-15807 Java thin: follow user-defined endpoint order, try default port first (#9522)
92e56ea is described below

commit 92e56eaeec55fec26c463545c9031449ef872672
Author: Ilya Kazakov <ka...@gmail.com>
AuthorDate: Mon Nov 8 19:43:30 2021 +0800

    IGNITE-15807 Java thin: follow user-defined endpoint order, try default port first (#9522)
    
    (cherry picked from commit c59ec1e6e6e5121e29f3b164617d8d30edbb08c7)
---
 .../internal/client/thin/ReliableChannel.java      | 50 ++++++++++------------
 .../internal/client/thin/ReliableChannelTest.java  | 38 ++++++++++++++++
 2 files changed, 61 insertions(+), 27 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index b75ad0c..2b48471 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -451,10 +450,9 @@ final class ReliableChannel implements AutoCloseable {
     }
 
     /**
-     * @return host:port_range address lines parsed as {@link InetSocketAddress} as a key. Value is the amount of
-     * appearences of an address in {@code addrs} parameter.
+     * @return List of host:port_range address lines parsed as {@link InetSocketAddress}.
      */
-    private static Map<InetSocketAddress, Integer> parsedAddresses(String[] addrs) throws ClientException {
+    private static List<InetSocketAddress> parsedAddresses(String[] addrs) throws ClientException {
         if (F.isEmpty(addrs))
             throw new ClientException("Empty addresses");
 
@@ -478,8 +476,7 @@ final class ReliableChannel implements AutoCloseable {
             .flatMap(r -> IntStream
                 .rangeClosed(r.portFrom(), r.portTo()).boxed()
                 .map(p -> InetSocketAddress.createUnresolved(r.host(), p))
-            )
-            .collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
+            ).collect(Collectors.toList());
     }
 
     /**
@@ -598,7 +595,7 @@ final class ReliableChannel implements AutoCloseable {
         // Enable parallel threads to schedule new init of channel holders.
         scheduledChannelsReinit.set(false);
 
-        Map<InetSocketAddress, Integer> newAddrs = null;
+        List<InetSocketAddress> newAddrs = null;
 
         if (clientCfg.getAddressesFinder() != null) {
             String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
@@ -620,14 +617,18 @@ final class ReliableChannel implements AutoCloseable {
         }
 
         Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
-        Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
 
-        if (holders != null) {
-            for (int i = 0; i < holders.size(); i++) {
-                ClientChannelHolder h = holders.get(i);
+        Map<InetSocketAddress, ClientChannelHolder> newHoldersMap = new HashMap<>();
+
+        Set<InetSocketAddress> newAddrsSet = new HashSet<>(newAddrs);
 
-                curAddrs.put(h.chCfg.getAddress(), h);
-                allAddrs.add(h.chCfg.getAddress());
+        // Close obsolete holders or map old but valid addresses to holders
+        if (holders != null) {
+            for (ClientChannelHolder h : holders) {
+                if (newAddrsSet.contains(h.getAddress()))
+                    curAddrs.put(h.getAddress(), h);
+                else
+                    h.close();
             }
         }
 
@@ -644,23 +645,18 @@ final class ReliableChannel implements AutoCloseable {
         if (idx != -1)
             currDfltHolder = holders.get(idx);
 
-        for (InetSocketAddress addr : allAddrs) {
+        for (InetSocketAddress addr : newAddrs) {
             if (shouldStopChannelsReinit())
                 return false;
 
-            // Obsolete addr, to be removed.
-            if (!newAddrs.containsKey(addr)) {
-                curAddrs.get(addr).close();
-
-                continue;
-            }
-
             // Create new holders for new addrs.
             if (!curAddrs.containsKey(addr)) {
-                ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr));
+                ClientChannelHolder hld =
+                        newHoldersMap.
+                                computeIfAbsent(addr,
+                                        a -> new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, a)));
 
-                for (int i = 0; i < newAddrs.get(addr); i++)
-                    reinitHolders.add(hld);
+                reinitHolders.add(hld);
 
                 continue;
             }
@@ -668,20 +664,20 @@ final class ReliableChannel implements AutoCloseable {
             // This holder is up to date.
             ClientChannelHolder hld = curAddrs.get(addr);
 
-            for (int i = 0; i < newAddrs.get(addr); i++)
-                reinitHolders.add(hld);
+            reinitHolders.add(hld);
 
             if (hld == currDfltHolder)
                 dfltChannelIdx = reinitHolders.size() - 1;
         }
 
         if (dfltChannelIdx == -1)
-            dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+            dfltChannelIdx = 0;
 
         curChannelsGuard.writeLock().lock();
 
         try {
             channels = reinitHolders;
+
             curChIdx = dfltChannelIdx;
         }
         finally {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index 6469afb..4a82635 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.client.ClientAuthorizationException;
 import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
@@ -75,6 +76,43 @@ public class ReliableChannelTest {
     }
 
     /**
+     * Checks that in case if address specified without port, the default port will be processed first
+     * */
+    @Test
+    public void testAddressWithoutPort() {
+        ClientConfiguration ccfg = new ClientConfiguration().setAddresses("127.0.0.1");
+
+        ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+
+        rc.channelsInit();
+
+        assertEquals(ClientConnectorConfiguration.DFLT_PORT_RANGE + 1, rc.getChannelHolders().size());
+
+        assertEquals(ClientConnectorConfiguration.DFLT_PORT, rc.getChannelHolders().iterator().next().getAddress().getPort());
+    }
+
+    /**
+     * Checks that ReliableChannel provides channels in the same order as in ClientConfiguration.
+     * */
+    @Test
+    public void testAddressesOrder() {
+        String[] addrs = new String[] {"127.0.0.1:10803", "127.0.0.1:10802", "127.0.0.1:10801", "127.0.0.1:10800"};
+
+        ClientConfiguration ccfg = new ClientConfiguration().setAddresses(addrs);
+
+        ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+
+        rc.channelsInit();
+
+        List<ReliableChannel.ClientChannelHolder> holders = rc.getChannelHolders();
+
+        assertEquals(addrs.length, holders.size());
+
+        for (int i = 0; i < addrs.length; i++)
+            assertEquals(addrs[i], holders.get(i).getAddress().toString());
+    }
+
+    /**
      * Checks that reinitialization of duplicated address is correct.
      */
     @Test