You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/11/08 13:11:07 UTC
[ignite] branch ignite-2.12 updated: IGNITE-15807 Java thin: follow
user-defined endpoint order, try default port first (#9522)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch ignite-2.12
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.12 by this push:
new 92e56ea IGNITE-15807 Java thin: follow user-defined endpoint order, try default port first (#9522)
92e56ea is described below
commit 92e56eaeec55fec26c463545c9031449ef872672
Author: Ilya Kazakov <ka...@gmail.com>
AuthorDate: Mon Nov 8 19:43:30 2021 +0800
IGNITE-15807 Java thin: follow user-defined endpoint order, try default port first (#9522)
(cherry picked from commit c59ec1e6e6e5121e29f3b164617d8d30edbb08c7)
---
.../internal/client/thin/ReliableChannel.java | 50 ++++++++++------------
.../internal/client/thin/ReliableChannelTest.java | 38 ++++++++++++++++
2 files changed, 61 insertions(+), 27 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index b75ad0c..2b48471 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -451,10 +450,9 @@ final class ReliableChannel implements AutoCloseable {
}
/**
- * @return host:port_range address lines parsed as {@link InetSocketAddress} as a key. Value is the amount of
- * appearences of an address in {@code addrs} parameter.
+ * @return List of host:port_range address lines parsed as {@link InetSocketAddress}.
*/
- private static Map<InetSocketAddress, Integer> parsedAddresses(String[] addrs) throws ClientException {
+ private static List<InetSocketAddress> parsedAddresses(String[] addrs) throws ClientException {
if (F.isEmpty(addrs))
throw new ClientException("Empty addresses");
@@ -478,8 +476,7 @@ final class ReliableChannel implements AutoCloseable {
.flatMap(r -> IntStream
.rangeClosed(r.portFrom(), r.portTo()).boxed()
.map(p -> InetSocketAddress.createUnresolved(r.host(), p))
- )
- .collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
+ ).collect(Collectors.toList());
}
/**
@@ -598,7 +595,7 @@ final class ReliableChannel implements AutoCloseable {
// Enable parallel threads to schedule new init of channel holders.
scheduledChannelsReinit.set(false);
- Map<InetSocketAddress, Integer> newAddrs = null;
+ List<InetSocketAddress> newAddrs = null;
if (clientCfg.getAddressesFinder() != null) {
String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
@@ -620,14 +617,18 @@ final class ReliableChannel implements AutoCloseable {
}
Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
- Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
- if (holders != null) {
- for (int i = 0; i < holders.size(); i++) {
- ClientChannelHolder h = holders.get(i);
+ Map<InetSocketAddress, ClientChannelHolder> newHoldersMap = new HashMap<>();
+
+ Set<InetSocketAddress> newAddrsSet = new HashSet<>(newAddrs);
- curAddrs.put(h.chCfg.getAddress(), h);
- allAddrs.add(h.chCfg.getAddress());
+ // Close obsolete holders or map old but valid addresses to holders
+ if (holders != null) {
+ for (ClientChannelHolder h : holders) {
+ if (newAddrsSet.contains(h.getAddress()))
+ curAddrs.put(h.getAddress(), h);
+ else
+ h.close();
}
}
@@ -644,23 +645,18 @@ final class ReliableChannel implements AutoCloseable {
if (idx != -1)
currDfltHolder = holders.get(idx);
- for (InetSocketAddress addr : allAddrs) {
+ for (InetSocketAddress addr : newAddrs) {
if (shouldStopChannelsReinit())
return false;
- // Obsolete addr, to be removed.
- if (!newAddrs.containsKey(addr)) {
- curAddrs.get(addr).close();
-
- continue;
- }
-
// Create new holders for new addrs.
if (!curAddrs.containsKey(addr)) {
- ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr));
+ ClientChannelHolder hld =
+ newHoldersMap.
+ computeIfAbsent(addr,
+ a -> new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, a)));
- for (int i = 0; i < newAddrs.get(addr); i++)
- reinitHolders.add(hld);
+ reinitHolders.add(hld);
continue;
}
@@ -668,20 +664,20 @@ final class ReliableChannel implements AutoCloseable {
// This holder is up to date.
ClientChannelHolder hld = curAddrs.get(addr);
- for (int i = 0; i < newAddrs.get(addr); i++)
- reinitHolders.add(hld);
+ reinitHolders.add(hld);
if (hld == currDfltHolder)
dfltChannelIdx = reinitHolders.size() - 1;
}
if (dfltChannelIdx == -1)
- dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+ dfltChannelIdx = 0;
curChannelsGuard.writeLock().lock();
try {
channels = reinitHolders;
+
curChIdx = dfltChannelIdx;
}
finally {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index 6469afb..4a82635 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.client.ClientAuthorizationException;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
@@ -75,6 +76,43 @@ public class ReliableChannelTest {
}
/**
+ * Checks that in case if address specified without port, the default port will be processed first
+ * */
+ @Test
+ public void testAddressWithoutPort() {
+ ClientConfiguration ccfg = new ClientConfiguration().setAddresses("127.0.0.1");
+
+ ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+
+ rc.channelsInit();
+
+ assertEquals(ClientConnectorConfiguration.DFLT_PORT_RANGE + 1, rc.getChannelHolders().size());
+
+ assertEquals(ClientConnectorConfiguration.DFLT_PORT, rc.getChannelHolders().iterator().next().getAddress().getPort());
+ }
+
+ /**
+ * Checks that ReliableChannel provides channels in the same order as in ClientConfiguration.
+ * */
+ @Test
+ public void testAddressesOrder() {
+ String[] addrs = new String[] {"127.0.0.1:10803", "127.0.0.1:10802", "127.0.0.1:10801", "127.0.0.1:10800"};
+
+ ClientConfiguration ccfg = new ClientConfiguration().setAddresses(addrs);
+
+ ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+
+ rc.channelsInit();
+
+ List<ReliableChannel.ClientChannelHolder> holders = rc.getChannelHolders();
+
+ assertEquals(addrs.length, holders.size());
+
+ for (int i = 0; i < addrs.length; i++)
+ assertEquals(addrs[i], holders.get(i).getAddress().toString());
+ }
+
+ /**
* Checks that reinitialization of duplicated address is correct.
*/
@Test