You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2023/02/09 14:05:55 UTC
[ignite] branch master updated: IGNITE-18591 Java thin client: Endpoints discovery - Fixes #10514.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 714ae9cc213 IGNITE-18591 Java thin client: Endpoints discovery - Fixes #10514.
714ae9cc213 is described below
commit 714ae9cc2133e4c91d9ee1e6a6fd9ebd2ee71109
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Thu Feb 9 16:59:16 2023 +0300
IGNITE-18591 Java thin client: Endpoints discovery - Fixes #10514.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../ComputeTaskRemoteSecurityContextTest.java | 2 +-
.../ignite/common/NodeSslConnectionMetricTest.java | 2 +
.../util/GridCommandHandlerMetadataTest.java | 3 +-
.../client/thin/ClientCacheAffinityContext.java | 3 +-
.../client/thin/ClientChannelConfiguration.java | 12 +-
.../client/thin/ClientDiscoveryContext.java | 303 +++++++++++++
.../internal/client/thin/ClientOperation.java | 3 +
.../client/thin/ProtocolBitmaskFeature.java | 3 +
.../internal/client/thin/ReliableChannel.java | 474 ++++++++++-----------
.../internal/client/thin/TcpClientChannel.java | 56 ++-
.../org/apache/ignite/client/ReliabilityTest.java | 104 +++--
.../apache/ignite/client/ReliabilityTestAsync.java | 38 --
.../client/ReliabilityTestPartitionAware.java | 37 --
.../client/ReliabilityTestPartitionAwareAsync.java | 35 --
.../client/thin/AbstractThinClientTest.java | 42 +-
.../client/thin/CacheEntryListenersTest.java | 34 ++
.../internal/client/thin/ComputeTaskTest.java | 7 +
.../internal/client/thin/ReliableChannelTest.java | 9 +-
.../ThinClientAbstractPartitionAwarenessTest.java | 23 +-
.../thin/ThinClientEnpointsDiscoveryTest.java | 110 +++++
...ClientPartitionAwarenessStableTopologyTest.java | 13 +
.../IgniteClientRequestEventListenerTest.java | 13 +-
.../processors/odbc/ClientListenerMetricsTest.java | 8 +-
.../PerformanceStatisticsThinClientTest.java | 4 +-
.../org/apache/ignite/client/ClientTestSuite.java | 7 +-
.../cache/IgniteCacheAbstractQuerySelfTest.java | 2 +-
.../org/apache/ignite/util/KillCommandsTests.java | 4 +-
.../discovery/TestClusterClientConnection.java | 20 +-
28 files changed, 947 insertions(+), 424 deletions(-)
diff --git a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskRemoteSecurityContextTest.java b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskRemoteSecurityContextTest.java
index 7c372cfd5fa..2587f1dc37a 100644
--- a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskRemoteSecurityContextTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskRemoteSecurityContextTest.java
@@ -221,7 +221,7 @@ public class ComputeTaskRemoteSecurityContextTest extends AbstractEventSecurityC
String login = "thin_client";
ClientConfiguration cfg = new ClientConfiguration()
- .setAddresses(Config.SERVER)
+ .setAddressesFinder(() -> new String[] {Config.SERVER})
.setUserName(login)
.setUserPassword("");
diff --git a/modules/clients/src/test/java/org/apache/ignite/common/NodeSslConnectionMetricTest.java b/modules/clients/src/test/java/org/apache/ignite/common/NodeSslConnectionMetricTest.java
index 3f85cd37e2a..484bb737f18 100644
--- a/modules/clients/src/test/java/org/apache/ignite/common/NodeSslConnectionMetricTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/common/NodeSslConnectionMetricTest.java
@@ -390,6 +390,8 @@ public class NodeSslConnectionMetricTest extends GridCommonAbstractTest {
) {
return new ClientConfiguration()
.setAddresses("127.0.0.1:10800")
+ // When PA is enabled, async client channel init executes and spoils the metrics.
+ .setPartitionAwarenessEnabled(false)
.setSslMode(SslMode.REQUIRED)
.setSslContextFactory(sslContextFactory(keyStore, trustStore, cipherSuite, protocol));
}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerMetadataTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerMetadataTest.java
index 50fcb5df7b8..b2305716b5f 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerMetadataTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerMetadataTest.java
@@ -716,7 +716,8 @@ public class GridCommandHandlerMetadataTest extends GridCommandHandlerClusterByC
/** */
protected ClientConfiguration clientConfiguration() {
return new ClientConfiguration()
- .setAddresses("127.0.0.1:10800");
+ .setAddressesFinder(() -> new String[] {"127.0.0.1:10800"})
+ .setPartitionAwarenessEnabled(false);
}
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
index 3c2e9686372..465f07500c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientCacheAffinityContext.java
@@ -123,8 +123,6 @@ public class ClientCacheAffinityContext {
* @param ch Payload output channel.
*/
public void writePartitionsUpdateRequest(PayloadOutputChannel ch) {
- assert rq == null : "Previous mapping request was not properly handled: " + rq;
-
final Set<Integer> cacheIds;
long lastAccessed;
@@ -139,6 +137,7 @@ public class ClientCacheAffinityContext {
.orElse(0);
}
+ // In case of IO error rq can hold previous mapping request. Just overwrite it, we don't need it anymore.
rq = new CacheMappingRequest(cacheIds, lastAccessed);
ClientCacheAffinityMapping.writeRequest(ch, rq.caches, rq.ts > 0);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
index f22ce8f91e4..97cdc822159 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.client.thin;
import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.cache.configuration.Factory;
@@ -34,7 +36,7 @@ import org.apache.ignite.internal.client.monitoring.EventListenerDemultiplexer;
*/
final class ClientChannelConfiguration {
/** Host. */
- private final InetSocketAddress addr;
+ private final List<InetSocketAddress> addrs;
/** Ssl mode. */
private final SslMode sslMode;
@@ -118,7 +120,7 @@ final class ClientChannelConfiguration {
* Constructor.
*/
@SuppressWarnings("UnnecessaryThis")
- ClientChannelConfiguration(ClientConfiguration cfg, InetSocketAddress addr) {
+ ClientChannelConfiguration(ClientConfiguration cfg, List<InetSocketAddress> addrs) {
this.sslMode = cfg.getSslMode();
this.tcpNoDelay = cfg.isTcpNoDelay();
this.timeout = cfg.getTimeout();
@@ -138,7 +140,7 @@ final class ClientChannelConfiguration {
this.userPwd = cfg.getUserPassword();
this.reconnectThrottlingPeriod = cfg.getReconnectThrottlingPeriod();
this.reconnectThrottlingRetries = cfg.getReconnectThrottlingRetries();
- this.addr = addr;
+ this.addrs = Collections.unmodifiableList(addrs);
this.userAttrs = cfg.getUserAttributes();
this.asyncContinuationExecutor = cfg.getAsyncContinuationExecutor();
this.heartbeatEnabled = cfg.isHeartbeatEnabled();
@@ -151,8 +153,8 @@ final class ClientChannelConfiguration {
/**
* @return Address.
*/
- public InetSocketAddress getAddress() {
- return addr;
+ public List<InetSocketAddress> getAddresses() {
+ return addrs;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientDiscoveryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientDiscoveryContext.java
new file mode 100644
index 00000000000..38040dd862c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientDiscoveryContext.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.util.HostAndPortRange;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.logger.NullLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Nodes discovery context for Ignite client.
+ */
+public class ClientDiscoveryContext {
+ /** Indicates unknown topology version. */
+ private static final long UNKNOWN_TOP_VER = -1;
+
+ /** */
+ private final AtomicBoolean refreshIsInProgress = new AtomicBoolean();
+
+ /** */
+ private final IgniteLogger log;
+
+ /** Statically configured addresses. */
+ @Nullable private final String[] addresses;
+
+ /** Configured address finder. */
+ @Nullable private final ClientAddressFinder addrFinder;
+
+ /** */
+ private volatile TopologyInfo topInfo;
+
+ /** Cache addresses returned by {@link ClientAddressFinder}. */
+ private volatile String[] prevHostAddrs;
+
+ /** Previously requested endpoints for topology version. */
+ private volatile long prevTopVer = UNKNOWN_TOP_VER;
+
+ /** */
+ public ClientDiscoveryContext(ClientConfiguration clientCfg) {
+ log = NullLogger.whenNull(clientCfg.getLogger());
+ addresses = clientCfg.getAddresses();
+ addrFinder = clientCfg.getAddressesFinder();
+ reset();
+ }
+
+ /** */
+ void reset() {
+ topInfo = new TopologyInfo(UNKNOWN_TOP_VER, Collections.emptyMap());
+ prevTopVer = UNKNOWN_TOP_VER;
+ prevHostAddrs = null;
+ }
+
+ /**
+ * Updates nodes endpoints from the server.
+ *
+ * @param ch Channel.
+ * @return {@code True} if updated.
+ */
+ boolean refresh(ClientChannel ch) {
+ if (addrFinder != null)
+ return false; // Used custom address finder.
+
+ if (!ch.protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.CLUSTER_GROUP_GET_NODES_ENDPOINTS))
+ return false;
+
+ if (ch.serverTopologyVersion() != null && topInfo.topVer >= ch.serverTopologyVersion().topologyVersion()) {
+ if (log.isDebugEnabled())
+ log.debug("Endpoints information is up to date, no update required");
+
+ return false; // Info is up to date.
+ }
+
+ // Allow only one request at time.
+ if (refreshIsInProgress.compareAndSet(false, true)) {
+ if (log.isDebugEnabled())
+ log.debug("Updating nodes endpoints");
+
+ try {
+ Map<UUID, NodeInfo> nodes = new HashMap<>(topInfo.nodes);
+
+ TopologyInfo newTopInfo = ch.service(ClientOperation.CLUSTER_GROUP_GET_NODE_ENDPOINTS,
+ req -> {
+ req.out().writeLong(topInfo.topVer);
+ req.out().writeLong(UNKNOWN_TOP_VER);
+ },
+ res -> {
+ try (BinaryReaderExImpl reader = ClientUtils.createBinaryReader(null, res.in())) {
+ long topVer = reader.readLong();
+
+ // Read added nodes.
+ int nodesAdded = reader.readInt();
+
+ for (int i = 0; i < nodesAdded; i++) {
+ UUID nodeId = new UUID(reader.readLong(), reader.readLong());
+ int port = reader.readInt();
+
+ int addrsCnt = reader.readInt();
+
+ List<String> addrs = new ArrayList<>();
+
+ for (int j = 0; j < addrsCnt; j++)
+ addrs.add(reader.readString());
+
+ nodes.put(nodeId, new NodeInfo(port, addrs));
+ }
+
+ // Read removed nodes.
+ int nodesRemoved = reader.readInt();
+
+ for (int i = 0; i < nodesRemoved; i++) {
+ UUID nodeId = new UUID(reader.readLong(), reader.readLong());
+
+ nodes.remove(nodeId);
+ }
+
+ return new TopologyInfo(topVer, nodes);
+ }
+ catch (IOException e) {
+ // Only declared for close() method, but never throwed.
+ assert false : "Unexpected exception: " + e;
+
+ return null;
+ }
+ }
+ );
+
+ if (log.isDebugEnabled()) {
+ log.debug("Updated nodes endpoints [topVer=" + newTopInfo.topVer +
+ ", nodesCnt=" + newTopInfo.nodes.size() + ']');
+ }
+
+ if (topInfo.topVer < newTopInfo.topVer) {
+ topInfo = newTopInfo;
+ return true;
+ }
+ else
+ return false;
+ }
+ finally {
+ refreshIsInProgress.set(false);
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Concurrent nodes endpoints update already in progress, skipping");
+
+ return false;
+ }
+ }
+
+ /**
+ * Gets list of endpoins for each node.
+ *
+ * @return Collection of nodes with list of endpoints for each node, or {@code null} if endpoints are not changed
+ * since last request.
+ */
+ @Nullable Collection<List<InetSocketAddress>> getEndpoints() {
+ Collection<List<InetSocketAddress>> endpoints = null;
+ TopologyInfo topInfo = this.topInfo;
+
+ if (addrFinder != null || topInfo.topVer == UNKNOWN_TOP_VER) {
+ String[] hostAddrs = addrFinder == null ? addresses : addrFinder.getAddresses();
+
+ if (F.isEmpty(hostAddrs))
+ throw new ClientException("Empty addresses");
+
+ if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+ endpoints = parsedAddresses(hostAddrs);
+ prevHostAddrs = hostAddrs;
+ }
+ }
+ else if (prevTopVer != topInfo.topVer) {
+ endpoints = topInfo.endpoints;
+ prevTopVer = topInfo.topVer;
+ }
+
+ return endpoints;
+ }
+
+ /**
+ * @return List of host:port_range address lines parsed as {@link InetSocketAddress}.
+ */
+ private static Collection<List<InetSocketAddress>> parsedAddresses(String[] addrs) throws ClientException {
+ if (F.isEmpty(addrs))
+ throw new ClientException("Empty addresses");
+
+ Collection<HostAndPortRange> ranges = new ArrayList<>(addrs.length);
+
+ for (String a : addrs) {
+ try {
+ ranges.add(HostAndPortRange.parse(
+ a,
+ ClientConnectorConfiguration.DFLT_PORT,
+ ClientConnectorConfiguration.DFLT_PORT + ClientConnectorConfiguration.DFLT_PORT_RANGE,
+ "Failed to parse Ignite server address"
+ ));
+ }
+ catch (IgniteCheckedException e) {
+ throw new ClientException(e);
+ }
+ }
+
+ return ranges.stream()
+ .flatMap(r -> IntStream
+ .rangeClosed(r.portFrom(), r.portTo()).boxed()
+ .map(p -> Collections.singletonList(InetSocketAddress.createUnresolved(r.host(), p)))
+ ).collect(Collectors.toList());
+ }
+
+ /** */
+ private static class TopologyInfo {
+ /** */
+ private final long topVer;
+
+ /** */
+ private final Map<UUID, NodeInfo> nodes;
+
+ /** Normalized nodes endpoints. */
+ private final Collection<List<InetSocketAddress>> endpoints;
+
+ /** */
+ private TopologyInfo(long ver, Map<UUID, NodeInfo> nodes) {
+ topVer = ver;
+ this.nodes = nodes;
+ endpoints = normalizeEndpoints(nodes.values());
+ }
+
+ /** Remove duplicates from nodes endpoints. */
+ private static Collection<List<InetSocketAddress>> normalizeEndpoints(Collection<NodeInfo> nodes) {
+ Collection<List<InetSocketAddress>> endpoints = new ArrayList<>(nodes.size());
+ Set<InetSocketAddress> used = new HashSet<>();
+
+ for (NodeInfo nodeInfo : nodes) {
+ List<InetSocketAddress> addrs = new ArrayList<>(nodeInfo.addrs.size());
+
+ // Check each address of each node for intersection with other nodes addresses.
+ for (String host : nodeInfo.addrs) {
+ InetSocketAddress addr = InetSocketAddress.createUnresolved(host, nodeInfo.port);
+
+ if (used.add(addr))
+ addrs.add(addr);
+ }
+
+ if (!addrs.isEmpty())
+ endpoints.add(addrs);
+ }
+
+ return Collections.unmodifiableCollection(endpoints);
+ }
+ }
+
+ /** */
+ private static class NodeInfo {
+ /** */
+ private final int port;
+
+ /** */
+ private final List<String> addrs;
+
+ /** */
+ private NodeInfo(int port, List<String> addrs) {
+ this.port = port;
+ this.addrs = addrs;
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index c291a0e3e9a..a683154a235 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -198,6 +198,9 @@ public enum ClientOperation {
/** Get nodes info by IDs. */
CLUSTER_GROUP_GET_NODE_INFO(5101),
+ /** Get nodes endpoints. */
+ CLUSTER_GROUP_GET_NODE_ENDPOINTS(5102),
+
/** Execute compute task. */
COMPUTE_TASK_EXECUTE(6000),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index d172142a1dd..3a586ec4952 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -37,6 +37,9 @@ public enum ProtocolBitmaskFeature {
*/
CLUSTER_STATES(2),
+ /** Client discovery. */
+ CLUSTER_GROUP_GET_NODES_ENDPOINTS(3),
+
/** Cluster groups. */
CLUSTER_GROUPS(4),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 6207c7fcd75..2bcf6b116c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -19,11 +19,9 @@ package org.apache.ignite.internal.client.thin;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -40,9 +38,7 @@ import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import org.apache.ignite.IgniteBinary;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.client.ClientAuthenticationException;
import org.apache.ignite.client.ClientAuthorizationException;
@@ -54,27 +50,26 @@ import org.apache.ignite.client.ClientRetryPolicy;
import org.apache.ignite.client.ClientRetryPolicyContext;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.configuration.ClientConfiguration;
-import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.client.thin.io.gridnioserver.GridNioClientConnectionMultiplexer;
-import org.apache.ignite.internal.util.HostAndPortRange;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.logger.NullLogger;
+import org.jetbrains.annotations.Nullable;
/**
* Communication channel with failover and partition awareness.
*/
final class ReliableChannel implements AutoCloseable {
- /** Do nothing helper function. */
- private static final Consumer<Integer> DO_NOTHING = (v) -> {};
-
/** Channel factory. */
private final BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory;
/** Client channel holders for each configured address. */
private volatile List<ClientChannelHolder> channels;
+ /** Limit of attempts to execute each service. */
+ private volatile int attemptsLimit;
+
/** Index of the current channel. */
private volatile int curChIdx = -1;
@@ -84,6 +79,9 @@ final class ReliableChannel implements AutoCloseable {
/** Cache partition awareness context. */
private final ClientCacheAffinityContext affinityCtx;
+ /** Nodes discovery context. */
+ private final ClientDiscoveryContext discoveryCtx;
+
/** Client configuration. */
private final ClientConfiguration clientCfg;
@@ -117,9 +115,6 @@ final class ReliableChannel implements AutoCloseable {
/** Connection manager. */
private final ClientConnectionMultiplexer connMgr;
- /** Cache addresses returned by {@code ThinClientAddressFinder}. */
- private volatile String[] prevHostAddrs;
-
/** Open channels counter. */
private final AtomicInteger channelsCnt = new AtomicInteger();
@@ -144,6 +139,7 @@ final class ReliableChannel implements AutoCloseable {
partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
affinityCtx = new ClientCacheAffinityContext(binary, clientCfg.getPartitionAwarenessMapperFactory());
+ discoveryCtx = new ClientDiscoveryContext(clientCfg);
connMgr = new GridNioClientConnectionMultiplexer(clientCfg);
connMgr.start();
@@ -200,7 +196,7 @@ final class ReliableChannel implements AutoCloseable {
CompletableFuture<T> fut = new CompletableFuture<>();
// Use the only one attempt to avoid blocking async method.
- handleServiceAsync(fut, op, payloadWriter, payloadReader, 1, null);
+ handleServiceAsync(fut, op, payloadWriter, payloadReader, new ArrayList<>());
return new IgniteClientFutureImpl<>(fut);
}
@@ -208,34 +204,35 @@ final class ReliableChannel implements AutoCloseable {
/**
* Handles serviceAsync results and retries as needed.
*/
- private <T> void handleServiceAsync(final CompletableFuture<T> fut,
- ClientOperation op,
- Consumer<PayloadOutputChannel> payloadWriter,
- Function<PayloadInputChannel, T> payloadReader,
- int attemptsLimit,
- ClientConnectionException failure) {
- ClientChannel ch;
- // Workaround to store used attempts value within lambda body.
- int attemptsCnt[] = new int[1];
-
+ private <T> void handleServiceAsync(
+ final CompletableFuture<T> fut,
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader,
+ List<ClientConnectionException> failures
+ ) {
try {
- ch = applyOnDefaultChannel(channel -> channel, null, attemptsLimit, v -> attemptsCnt[0] = v);
+ applyOnDefaultChannel(
+ channel -> applyOnClientChannelAsync(fut, channel, op, payloadWriter, payloadReader, failures),
+ null,
+ failures
+ );
}
catch (Throwable ex) {
- if (failure != null) {
- failure.addSuppressed(ex);
-
- fut.completeExceptionally(failure);
-
- return;
- }
-
fut.completeExceptionally(ex);
-
- return;
}
+ }
- ch
+ /** */
+ private <T> Object applyOnClientChannelAsync(
+ final CompletableFuture<T> fut,
+ ClientChannel ch,
+ ClientOperation op,
+ Consumer<PayloadOutputChannel> payloadWriter,
+ Function<PayloadInputChannel, T> payloadReader,
+ List<ClientConnectionException> failures
+ ) {
+ return ch
.serviceAsync(op, payloadWriter, payloadReader)
.handle((res, err) -> {
if (err == null) {
@@ -244,12 +241,14 @@ final class ReliableChannel implements AutoCloseable {
return null;
}
- ClientConnectionException failure0 = failure;
-
if (err instanceof ClientConnectionException) {
+ ClientConnectionException failure0 = (ClientConnectionException)err;
+
+ failures.add(failure0);
+
try {
// Will try to reinit channels if topology changed.
- onChannelFailure(ch, err);
+ onChannelFailure(ch, err, failures);
}
catch (Throwable ex) {
fut.completeExceptionally(ex);
@@ -257,31 +256,16 @@ final class ReliableChannel implements AutoCloseable {
return null;
}
- if (failure0 == null)
- failure0 = (ClientConnectionException)err;
- else
- failure0.addSuppressed(err);
-
- int attempt = attemptsCnt[0];
- int leftAttempts = attemptsLimit - attempt;
-
- // If it is a first retry then reset attempts (as for initialization we use only 1 attempt).
- if (failure == null)
- leftAttempts = getRetryLimit() - 1;
-
- if (leftAttempts > 0 && shouldRetry(op, attempt, failure0)) {
- handleServiceAsync(fut, op, payloadWriter, payloadReader, leftAttempts, failure0);
+ if (failures.size() < attemptsLimit && shouldRetry(op, failures.size() - 1, failure0)) {
+ handleServiceAsync(fut, op, payloadWriter, payloadReader, failures);
return null;
}
- }
- else {
- fut.completeExceptionally(err instanceof ClientException ? err : new ClientException(err));
- return null;
+ fut.completeExceptionally(composeException(failures));
}
-
- fut.completeExceptionally(failure0);
+ else
+ fut.completeExceptionally(err instanceof ClientException ? err : new ClientException(err));
return null;
});
@@ -378,47 +362,17 @@ final class ReliableChannel implements AutoCloseable {
if (affNodeId != null) {
CompletableFuture<T> fut = new CompletableFuture<>();
+ List<ClientConnectionException> failures = new ArrayList<>();
- Object result = applyOnNodeChannel(affNodeId, channel ->
- channel
- .serviceAsync(op, payloadWriter, payloadReader)
- .handle((res, err) -> {
- if (err == null) {
- fut.complete(res);
- return null;
- }
-
- try {
- // Will try to reinit channels if topology changed.
- onChannelFailure(channel, err);
- }
- catch (Throwable ex) {
- fut.completeExceptionally(ex);
- return null;
- }
-
- if (err instanceof ClientConnectionException) {
- ClientConnectionException failure = (ClientConnectionException)err;
-
- int attemptsLimit = getRetryLimit() - 1;
-
- if (attemptsLimit == 0 || !shouldRetry(op, 0, failure)) {
- fut.completeExceptionally(err);
- return null;
- }
-
- handleServiceAsync(fut, op, payloadWriter, payloadReader, attemptsLimit, failure);
- return null;
- }
-
- fut.completeExceptionally(err);
- return null;
- }));
+ Object result = applyOnNodeChannel(
+ affNodeId,
+ channel -> applyOnClientChannelAsync(fut, channel, op, payloadWriter, payloadReader, failures),
+ failures
+ );
if (result != null)
return new IgniteClientFutureImpl<>(fut);
}
-
}
return serviceAsync(op, payloadWriter, payloadReader);
@@ -458,6 +412,8 @@ final class ReliableChannel implements AutoCloseable {
if (lastTop == null)
return false;
+ List<ClientConnectionException> failures = new ArrayList<>();
+
for (UUID nodeId : lastTop.nodes()) {
// Abort iterations when topology changed.
if (lastTop != affinityCtx.lastTopology())
@@ -466,13 +422,23 @@ final class ReliableChannel implements AutoCloseable {
Boolean result = applyOnNodeChannel(nodeId, channel ->
channel.service(ClientOperation.CACHE_PARTITIONS,
affinityCtx::writePartitionsUpdateRequest,
- affinityCtx::readPartitionsUpdateResponse)
+ affinityCtx::readPartitionsUpdateResponse),
+ failures
);
- if (result != null)
+ if (result != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Cache partitions mapping updated [cacheId=" + cacheId +
+ ", nodeId=" + nodeId + ']');
+ }
+
return result;
+ }
}
+ log.warning("Failed to update cache partitions mapping [cacheId=" + cacheId + ']',
+ composeException(failures));
+
// There is no one alive node found for last topology version, we should reset affinity context
// to let affinity get updated in case of reconnection to the new cluster (with lower topology
// version).
@@ -491,36 +457,6 @@ final class ReliableChannel implements AutoCloseable {
return true;
}
- /**
- * @return List of host:port_range address lines parsed as {@link InetSocketAddress}.
- */
- private static List<InetSocketAddress> parsedAddresses(String[] addrs) throws ClientException {
- if (F.isEmpty(addrs))
- throw new ClientException("Empty addresses");
-
- Collection<HostAndPortRange> ranges = new ArrayList<>(addrs.length);
-
- for (String a : addrs) {
- try {
- ranges.add(HostAndPortRange.parse(
- a,
- ClientConnectorConfiguration.DFLT_PORT,
- ClientConnectorConfiguration.DFLT_PORT + ClientConnectorConfiguration.DFLT_PORT_RANGE,
- "Failed to parse Ignite server address"
- ));
- }
- catch (IgniteCheckedException e) {
- throw new ClientException(e);
- }
- }
-
- return ranges.stream()
- .flatMap(r -> IntStream
- .rangeClosed(r.portFrom(), r.portTo()).boxed()
- .map(p -> InetSocketAddress.createUnresolved(r.host(), p))
- ).collect(Collectors.toList());
- }
-
/**
* Roll current default channel if specified holder equals to it.
*/
@@ -550,18 +486,23 @@ final class ReliableChannel implements AutoCloseable {
/**
* On current channel failure.
*/
- private void onChannelFailure(ClientChannel ch, Throwable t) {
+ private void onChannelFailure(ClientChannel ch, Throwable t, @Nullable List<ClientConnectionException> failures) {
// There is nothing wrong if curChIdx was concurrently changed, since channel was closed by another thread
// when current index was changed and no other wrong channel will be closed by current thread because
// onChannelFailure checks channel binded to the holder before closing it.
- onChannelFailure(channels.get(curChIdx), ch, t);
+ onChannelFailure(channels.get(curChIdx), ch, t, failures);
}
/**
* On channel of the specified holder failure.
*/
- private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch, Throwable t) {
- log.warning("Channel failure [address=" + hld.chCfg.getAddress() + ", err=" + t.getMessage() + ']', t);
+ private void onChannelFailure(
+ ClientChannelHolder hld,
+ ClientChannel ch,
+ Throwable t,
+ @Nullable List<ClientConnectionException> failures
+ ) {
+ log.warning("Channel failure [channel=" + ch + ", err=" + t.getMessage() + ']', t);
if (ch != null && ch == hld.ch)
hld.closeChannel();
@@ -571,20 +512,15 @@ final class ReliableChannel implements AutoCloseable {
// Roll current channel even if a topology changes. To help find working channel faster.
rollCurrentChannel(hld);
- // For partiton awareness it's already initializing asynchronously in #onTopologyChanged.
- if (addressFinderAddressesChanged() || (scheduledChannelsReinit.get() && !partitionAwarenessEnabled))
- channelsInit();
- }
-
- /**
- * Checks whether addressFinder returns a different set of addresses.
- */
- private boolean addressFinderAddressesChanged() {
- if (clientCfg.getAddressesFinder() == null)
- return false;
-
- String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
- return !Arrays.equals(hostAddrs, prevHostAddrs);
+ if (channelsCnt.get() == 0 && F.size(failures) == attemptsLimit) {
+ // All channels have failed.
+ discoveryCtx.reset();
+ channelsInit(failures);
+ }
+ else if (scheduledChannelsReinit.get() && !partitionAwarenessEnabled) {
+ // For partiton awareness it's already initializing asynchronously in #onTopologyChanged.
+ channelsInit(failures);
+ }
}
/**
@@ -603,7 +539,8 @@ final class ReliableChannel implements AutoCloseable {
hld.getOrCreateChannel(true);
}
catch (Exception e) {
- log.warning("Failed to initialize channel [address=" + hld.chCfg.getAddress() + ", err=" + e.getMessage() + ']', e);
+ log.warning("Failed to initialize channel [addresses=" + hld.getAddresses() + ", err=" +
+ e.getMessage() + ']', e);
}
}
}
@@ -616,12 +553,24 @@ final class ReliableChannel implements AutoCloseable {
* @param ch Channel.
*/
private void onTopologyChanged(ClientChannel ch) {
+ if (log.isDebugEnabled())
+ log.debug("Topology change detected [ch=" + ch + ", top=" + ch.serverTopologyVersion() + ']');
+
if (affinityCtx.updateLastTopologyVersion(ch.serverTopologyVersion(), ch.serverNodeId())) {
- if (scheduledChannelsReinit.compareAndSet(false, true)) {
- // If partition awareness is disabled then only schedule and wait for the default channel to fail.
- if (partitionAwarenessEnabled)
- ForkJoinPool.commonPool().submit(this::channelsInit);
- }
+ ForkJoinPool.commonPool().submit(() -> {
+ try {
+ discoveryCtx.refresh(ch);
+ }
+ catch (ClientException e) {
+ log.warning("Failed to get nodes endpoints", e);
+ }
+
+ if (scheduledChannelsReinit.compareAndSet(false, true)) {
+ // If partition awareness is disabled then only schedule and wait for the default channel to fail.
+ if (partitionAwarenessEnabled)
+ channelsInit();
+ }
+ });
}
}
@@ -632,18 +581,10 @@ final class ReliableChannel implements AutoCloseable {
chFailLsnrs.add(chFailLsnr);
}
- /**
- * Should the channel initialization be stopped.
- */
- private boolean shouldStopChannelsReinit() {
- return scheduledChannelsReinit.get() || closed;
- }
-
/**
* Init channel holders to all nodes.
- * @return boolean wheter channels was reinited.
*/
- synchronized boolean initChannelHolders() {
+ synchronized void initChannelHolders() {
List<ClientChannelHolder> holders = channels;
startChannelsReInit = System.currentTimeMillis();
@@ -651,40 +592,34 @@ final class ReliableChannel implements AutoCloseable {
// Enable parallel threads to schedule new init of channel holders.
scheduledChannelsReinit.set(false);
- List<InetSocketAddress> newAddrs = null;
-
- if (clientCfg.getAddressesFinder() != null) {
- String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
-
- if (hostAddrs.length == 0)
- throw new ClientException("Empty addresses");
-
- if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
- newAddrs = parsedAddresses(hostAddrs);
- prevHostAddrs = hostAddrs;
- }
- }
- else if (holders == null)
- newAddrs = parsedAddresses(clientCfg.getAddresses());
+ Collection<List<InetSocketAddress>> newAddrs = discoveryCtx.getEndpoints();
if (newAddrs == null) {
finishChannelsReInit = System.currentTimeMillis();
- return true;
+ return;
}
Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
- Map<InetSocketAddress, ClientChannelHolder> newHoldersMap = new HashMap<>();
-
- Set<InetSocketAddress> newAddrsSet = new HashSet<>(newAddrs);
+ Set<InetSocketAddress> newAddrsSet = newAddrs.stream().flatMap(Collection::stream).collect(Collectors.toSet());
// Close obsolete holders or map old but valid addresses to holders
if (holders != null) {
for (ClientChannelHolder h : holders) {
- if (newAddrsSet.contains(h.getAddress()))
- curAddrs.put(h.getAddress(), h);
- else
+ boolean found = false;
+
+ for (InetSocketAddress addr : h.getAddresses()) {
+ // If new endpoints contain at least one of channel addresses, don't close this channel.
+ if (newAddrsSet.contains(addr)) {
+ ClientChannelHolder oldHld = curAddrs.putIfAbsent(addr, h);
+
+ if (oldHld == null || oldHld == h) // If not duplicate.
+ found = true;
+ }
+ }
+
+ if (!found)
h.close();
}
}
@@ -702,24 +637,27 @@ final class ReliableChannel implements AutoCloseable {
if (idx != -1)
currDfltHolder = holders.get(idx);
- for (InetSocketAddress addr : newAddrs) {
- if (shouldStopChannelsReinit())
- return false;
+ for (List<InetSocketAddress> addrs : newAddrs) {
+ ClientChannelHolder hld = null;
- // Create new holders for new addrs.
- if (!curAddrs.containsKey(addr)) {
- ClientChannelHolder hld =
- newHoldersMap.
- computeIfAbsent(addr,
- a -> new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, a)));
+ // Try to find already created channel holder.
+ for (InetSocketAddress addr : addrs) {
+ hld = curAddrs.get(addr);
- reinitHolders.add(hld);
+ if (hld != null) {
+ if (!hld.getAddresses().equals(addrs)) // Enrich holder addresses.
+ hld.setConfiguration(new ClientChannelConfiguration(clientCfg, addrs));
- continue;
+ break;
+ }
}
- // This holder is up to date.
- ClientChannelHolder hld = curAddrs.get(addr);
+ if (hld == null) { // If not found, create the new one.
+ hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addrs));
+
+ for (InetSocketAddress addr : addrs)
+ curAddrs.putIfAbsent(addr, hld);
+ }
reinitHolders.add(hld);
@@ -729,12 +667,12 @@ final class ReliableChannel implements AutoCloseable {
if (dfltChannelIdx == -1) {
// If holder is not specified get the random holder from the range of holders with the same port.
- reinitHolders.sort(Comparator.comparingInt(h -> h.getAddress().getPort()));
+ reinitHolders.sort(Comparator.comparingInt(h -> F.first(h.getAddresses()).getPort()));
int limit = 0;
- int port = reinitHolders.get(0).getAddress().getPort();
+ int port = F.first(reinitHolders.get(0).getAddresses()).getPort();
- while (limit + 1 < reinitHolders.size() && reinitHolders.get(limit + 1).getAddress().getPort() == port)
+ while (limit + 1 < reinitHolders.size() && F.first(reinitHolders.get(limit + 1).getAddresses()).getPort() == port)
limit++;
dfltChannelIdx = ThreadLocalRandom.current().nextInt(limit + 1);
@@ -745,6 +683,8 @@ final class ReliableChannel implements AutoCloseable {
try {
channels = reinitHolders;
+ attemptsLimit = getRetryLimit();
+
curChIdx = dfltChannelIdx;
}
finally {
@@ -752,8 +692,6 @@ final class ReliableChannel implements AutoCloseable {
}
finishChannelsReInit = System.currentTimeMillis();
-
- return true;
}
/**
@@ -761,12 +699,28 @@ final class ReliableChannel implements AutoCloseable {
* for every configured server. Otherwise only default channel is connected.
*/
void channelsInit() {
- // Do not establish connections if interrupted.
- if (!initChannelHolders())
- return;
+ channelsInit(null);
+ }
+
+ /**
+ * Establishing connections to servers. If partition awareness feature is enabled connections are created
+ * for every configured server. Otherwise only default channel is connected.
+ */
+ void channelsInit(@Nullable List<ClientConnectionException> failures) {
+ if (log.isDebugEnabled())
+ log.debug("Init channel holders");
+
+ initChannelHolders();
- // Apply no-op function. Establish default channel connection.
- applyOnDefaultChannel(channel -> null, null);
+ if (failures == null || failures.size() < attemptsLimit) {
+ if (channelsCnt.get() == 0) {
+ // Establish default channel connection and retrive nodes endpoints if applicable.
+ if (applyOnDefaultChannel(discoveryCtx::refresh, null, failures))
+ initChannelHolders();
+ }
+ else // Apply no-op function. Establish default channel connection.
+ applyOnDefaultChannel(channel -> null, null, failures);
+ }
if (partitionAwarenessEnabled)
initAllChannelsAsync();
@@ -775,7 +729,11 @@ final class ReliableChannel implements AutoCloseable {
/**
* Apply specified {@code function} on a channel corresponding to specified {@code nodeId}.
*/
- private <T> T applyOnNodeChannel(UUID nodeId, Function<ClientChannel, T> function) {
+ private <T> T applyOnNodeChannel(
+ UUID nodeId,
+ Function<ClientChannel, T> function,
+ @Nullable List<ClientConnectionException> failures
+ ) {
ClientChannelHolder hld = null;
ClientChannel channel = null;
@@ -788,7 +746,12 @@ final class ReliableChannel implements AutoCloseable {
return function.apply(channel);
}
catch (ClientConnectionException e) {
- onChannelFailure(hld, channel, e);
+ if (failures == null)
+ failures = new ArrayList<>();
+
+ failures.add(e);
+
+ onChannelFailure(hld, channel, e, failures);
}
return null;
@@ -796,19 +759,18 @@ final class ReliableChannel implements AutoCloseable {
/** */
<T> T applyOnDefaultChannel(Function<ClientChannel, T> function, ClientOperation op) {
- return applyOnDefaultChannel(function, op, getRetryLimit(), DO_NOTHING);
+ return applyOnDefaultChannel(function, op, null);
}
/**
* Apply specified {@code function} on any of available channel.
*/
- private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function,
- ClientOperation op,
- int attemptsLimit,
- Consumer<Integer> attemptsCallback) {
- ClientConnectionException failure = null;
-
- for (int attempt = 0; attempt < attemptsLimit; attempt++) {
+ private <T> T applyOnDefaultChannel(
+ Function<ClientChannel, T> function,
+ ClientOperation op,
+ @Nullable List<ClientConnectionException> failures
+ ) {
+ while (attemptsLimit > (failures == null ? 0 : failures.size())) {
ClientChannelHolder hld = null;
ClientChannel c = null;
@@ -819,7 +781,7 @@ final class ReliableChannel implements AutoCloseable {
curChannelsGuard.readLock().lock();
try {
- if (!partitionAwarenessEnabled || channelsCnt.get() <= 1 || attempt != 0)
+ if (!partitionAwarenessEnabled || channelsCnt.get() <= 1 || F.size(failures) > 0)
hld = channels.get(curChIdx);
else {
// Make first attempt with the random open channel.
@@ -839,28 +801,53 @@ final class ReliableChannel implements AutoCloseable {
curChannelsGuard.readLock().unlock();
}
- c = hld.getOrCreateChannel();
+ ClientChannel c0 = hld.ch;
- if (c != null) {
- attemptsCallback.accept(attempt + 1);
+ c = hld.getOrCreateChannel();
+ try {
return function.apply(c);
}
+ catch (ClientConnectionException e) {
+ if (c0 == c && partitionAwarenessEnabled) {
+ // In case of stale channel, when partition awareness is enabled, try to reconnect to the
+ // same channel and repeat the operation.
+ onChannelFailure(hld, c, e, failures);
+
+ c = hld.getOrCreateChannel();
+
+ return function.apply(c);
+ }
+ else
+ throw e;
+ }
}
catch (ClientConnectionException e) {
- if (failure == null)
- failure = e;
- else
- failure.addSuppressed(e);
+ if (failures == null)
+ failures = new ArrayList<>();
+
+ failures.add(e);
- onChannelFailure(hld, c, e);
+ onChannelFailure(hld, c, e, failures);
- if (op != null && !shouldRetry(op, attempt, e))
+ if (op != null && !shouldRetry(op, failures.size() - 1, e))
break;
}
}
- throw failure;
+ throw composeException(failures);
+ }
+
+ /** */
+ private ClientConnectionException composeException(List<ClientConnectionException> failures) {
+ if (F.isEmpty(failures))
+ return null;
+
+ ClientConnectionException failure = failures.get(0);
+
+ failures.subList(1, failures.size()).forEach(failure::addSuppressed);
+
+ return failure;
}
/**
@@ -870,7 +857,7 @@ final class ReliableChannel implements AutoCloseable {
private <T> T applyOnNodeChannelWithFallback(UUID tryNodeId, Function<ClientChannel, T> function, ClientOperation op) {
ClientChannelHolder hld = nodeChannels.get(tryNodeId);
- int retryLimit = getRetryLimit();
+ List<ClientConnectionException> failures = null;
if (hld != null) {
ClientChannel channel = null;
@@ -878,21 +865,20 @@ final class ReliableChannel implements AutoCloseable {
try {
channel = hld.getOrCreateChannel();
- if (channel != null)
- return function.apply(channel);
-
+ return function.apply(channel);
}
catch (ClientConnectionException e) {
- onChannelFailure(hld, channel, e);
+ failures = new ArrayList<>();
+ failures.add(e);
- retryLimit -= 1;
+ onChannelFailure(hld, channel, e, failures);
- if (retryLimit == 0 || !shouldRetry(op, 0, e))
+ if (attemptsLimit == 1 || !shouldRetry(op, 0, e))
throw e;
}
}
- return applyOnDefaultChannel(function, op, retryLimit, DO_NOTHING);
+ return applyOnDefaultChannel(function, op, failures);
}
/** Get retry limit. */
@@ -952,7 +938,7 @@ final class ReliableChannel implements AutoCloseable {
@SuppressWarnings("PackageVisibleInnerClass") // Visible for tests.
class ClientChannelHolder {
/** Channel configuration. */
- private final ClientChannelConfiguration chCfg;
+ private volatile ClientChannelConfiguration chCfg;
/** Channel. */
private volatile ClientChannel ch;
@@ -1009,10 +995,13 @@ final class ReliableChannel implements AutoCloseable {
*/
private ClientChannel getOrCreateChannel(boolean ignoreThrottling)
throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
- if (ch == null && !close) {
+ if (close)
+ throw new ClientConnectionException("Channel is closed");
+
+ if (ch == null) {
synchronized (this) {
if (close)
- return null;
+ throw new ClientConnectionException("Channel is closed");
if (ch != null)
return ch;
@@ -1081,10 +1070,17 @@ final class ReliableChannel implements AutoCloseable {
}
/**
- * Get address of the channel. For test purposes.
+ * Get addresses of the channel.
*/
- InetSocketAddress getAddress() {
- return chCfg.getAddress();
+ List<InetSocketAddress> getAddresses() {
+ return chCfg.getAddresses();
+ }
+
+ /**
+ * Set new channel configuration.
+ */
+ void setConfiguration(ClientChannelConfiguration chCfg) {
+ this.chCfg = chCfg;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 5d2d38f09c4..062d4bd0731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
@@ -192,10 +193,39 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
timeout = cfg.getTimeout();
- sock = connMgr.open(cfg.getAddress(), this, this);
+ List<InetSocketAddress> addrs = cfg.getAddresses();
- if (log.isDebugEnabled())
- log.debug("Connection establised: " + cfg.getAddress());
+ ClientConnection sock = null;
+ ClientConnectionException connectionEx = null;
+
+ assert !addrs.isEmpty();
+
+ for (InetSocketAddress addr : addrs) {
+ try {
+ sock = connMgr.open(addr, this, this);
+
+ if (log.isDebugEnabled())
+ log.debug("Connection established: " + addr);
+
+ break;
+ }
+ catch (ClientConnectionException e) {
+ log.info("Can't establish connection with " + addr);
+
+ if (connectionEx != null)
+ connectionEx.addSuppressed(e);
+ else
+ connectionEx = e;
+ }
+ }
+
+ if (sock == null) {
+ assert connectionEx != null;
+
+ throw connectionEx;
+ }
+
+ this.sock = sock;
handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes());
@@ -633,13 +663,18 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
private static void validateConfiguration(ClientChannelConfiguration cfg) {
String error = null;
- InetSocketAddress addr = cfg.getAddress();
+ List<InetSocketAddress> addrs = cfg.getAddresses();
- if (addr == null)
+ if (F.isEmpty(addrs))
error = "At least one Ignite server node must be specified in the Ignite client configuration";
- else if (addr.getPort() < 1024 || addr.getPort() > 49151)
- error = String.format("Ignite client port %s is out of valid ports range 1024...49151", addr.getPort());
- else if (cfg.getHeartbeatInterval() <= 0)
+ else {
+ for (InetSocketAddress addr : addrs) {
+ if (addr.getPort() < 1024 || addr.getPort() > 49151)
+ error = String.format("Ignite client port %s is out of valid ports range 1024...49151", addr.getPort());
+ }
+ }
+
+ if (error == null && cfg.getHeartbeatInterval() <= 0)
error = "heartbeatInterval cannot be zero or less.";
if (error != null)
@@ -885,6 +920,11 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
return res;
}
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "TcpClientChannel [srvNodeId=" + srvNodeId + ", addr=" + sock.remoteAddress() + ']';
+ }
+
/** */
private static class ClientRequestFuture extends GridFutureAdapter<ByteBuffer> {
/** */
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 1a077004b8f..14c4dc44eb3 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -17,7 +17,9 @@
package org.apache.ignite.client;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -37,6 +39,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
@@ -48,12 +51,16 @@ import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.internal.client.thin.AbstractThinClientTest;
import org.apache.ignite.internal.client.thin.ClientOperation;
import org.apache.ignite.internal.client.thin.ClientServerError;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Assume;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.apache.ignite.events.EventType.EVTS_CACHE;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
@@ -62,30 +69,58 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
/**
* High Availability tests.
*/
+@RunWith(Parameterized.class)
public class ReliabilityTest extends AbstractThinClientTest {
/** Service name. */
private static final String SERVICE_NAME = "svc";
+ /** Partition awareness. */
+ @Parameterized.Parameter
+ public boolean partitionAware;
+
+ /** Async operations. */
+ @Parameterized.Parameter(1)
+ public boolean async;
+
+ /**
+ * @return List of parameters to test.
+ */
+ @Parameterized.Parameters(name = "partitionAware={0}, async={1}")
+ public static Collection<Object[]> testData() {
+ List<Object[]> res = new ArrayList<>();
+
+ res.add(new Object[] {false, false});
+ res.add(new Object[] {false, true});
+ res.add(new Object[] {true, false});
+ res.add(new Object[] {true, true});
+
+ return res;
+ }
+
/**
* Thin clint failover.
*/
@Test
public void testFailover() throws Exception {
- if (isPartitionAware())
- return;
+ Assume.assumeFalse(partitionAware);
final int CLUSTER_SIZE = 3;
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(CLUSTER_SIZE);
IgniteClient client = Ignition.startClient(getClientConfiguration()
.setReconnectThrottlingRetries(0) // Disable throttling.
- .setAddresses(cluster.clientAddresses().toArray(new String[CLUSTER_SIZE]))
+ // Disable endpoints discovery, since in this test it reduces attempts count and sometimes one extra
+ // attempt is required to complete operation without failure.
+ .setAddressesFinder(new StaticAddressFinder(cluster.clientAddresses().toArray(new String[CLUSTER_SIZE])))
)
) {
final Random rnd = new Random();
final ClientCache<Integer, String> cache = client.getOrCreateCache(
- new ClientCacheConfiguration().setName("testFailover").setCacheMode(CacheMode.REPLICATED)
+ new ClientCacheConfiguration()
+ .setName("testFailover")
+ .setCacheMode(CacheMode.REPLICATED)
+ .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
);
// Simple operation failover: put/get
@@ -183,9 +218,10 @@ public class ReliabilityTest extends AbstractThinClientTest {
public void testSingleServerDuplicatedFailover() throws Exception {
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client = Ignition.startClient(getClientConfiguration()
- .setAddresses(
- cluster.clientAddresses().iterator().next(),
- cluster.clientAddresses().iterator().next()))
+ .setAddressesFinder(new StaticAddressFinder(
+ F.first(cluster.clientAddresses()),
+ F.first(cluster.clientAddresses())
+ )))
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");
@@ -208,9 +244,10 @@ public class ReliabilityTest extends AbstractThinClientTest {
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client = Ignition.startClient(getClientConfiguration()
.setRetryPolicy(new ClientRetryReadPolicy())
- .setAddresses(
- cluster.clientAddresses().iterator().next(),
- cluster.clientAddresses().iterator().next()))
+ .setAddressesFinder(new StaticAddressFinder(
+ F.first(cluster.clientAddresses()),
+ F.first(cluster.clientAddresses())
+ )))
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");
@@ -230,15 +267,15 @@ public class ReliabilityTest extends AbstractThinClientTest {
*/
@Test
public void testExceptionInRetryPolicyPropagatesToCaller() {
- if (isPartitionAware())
- return;
+ Assume.assumeFalse(partitionAware);
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client = Ignition.startClient(getClientConfiguration()
.setRetryPolicy(new ExceptionRetryPolicy())
- .setAddresses(
- cluster.clientAddresses().iterator().next(),
- cluster.clientAddresses().iterator().next()))
+ .setAddressesFinder(new StaticAddressFinder(
+ F.first(cluster.clientAddresses()),
+ F.first(cluster.clientAddresses())
+ )))
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");
dropAllThinClientConnections(Ignition.allGrids().get(0));
@@ -266,9 +303,10 @@ public class ReliabilityTest extends AbstractThinClientTest {
try (LocalIgniteCluster cluster = LocalIgniteCluster.start(1);
IgniteClient client = Ignition.startClient(getClientConfiguration()
.setRetryLimit(1)
- .setAddresses(
- cluster.clientAddresses().iterator().next(),
- cluster.clientAddresses().iterator().next()))
+ .setAddressesFinder(new StaticAddressFinder(
+ F.first(cluster.clientAddresses()),
+ F.first(cluster.clientAddresses())
+ )))
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");
@@ -349,7 +387,7 @@ public class ReliabilityTest extends AbstractThinClientTest {
String nullOpsNames = nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
- long expectedNullCnt = 20;
+ long expectedNullCnt = 21;
String msg = nullOps.size()
+ " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
@@ -404,8 +442,7 @@ public class ReliabilityTest extends AbstractThinClientTest {
public void testTxWithIdIntersection() throws Exception {
// Partition-aware client connects to all known servers at the start, and dropAllThinClientConnections
// causes failure on all channels, so the logic in this test is not applicable.
- if (isPartitionAware())
- return;
+ Assume.assumeFalse(partitionAware);
int CLUSTER_SIZE = 2;
@@ -473,6 +510,9 @@ public class ReliabilityTest extends AbstractThinClientTest {
@Test
@SuppressWarnings("ThrowableNotThrown")
public void testReconnectionThrottling() throws Exception {
+ // If partition awareness is enabled, channels are restored asynchronously without applying throttling.
+ Assume.assumeFalse(partitionAware);
+
int throttlingRetries = 5;
long throttlingPeriod = 3_000L;
@@ -646,7 +686,19 @@ public class ReliabilityTest extends AbstractThinClientTest {
* @param <V> Val type.
*/
protected <K, V> void cachePut(ClientCache<K, V> cache, K key, V val) {
- cache.put(key, val);
+ if (async) {
+ try {
+ cache.putAsync(key, val).get();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() instanceof RuntimeException)
+ throw (RuntimeException)e.getCause();
+
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ cache.put(key, val);
}
/**
@@ -687,11 +739,9 @@ public class ReliabilityTest extends AbstractThinClientTest {
}
}
- /**
- * Returns a value indicating whether partition awareness is enabled.
- */
- protected boolean isPartitionAware() {
- return false;
+ /** {@inheritDoc} */
+ @Override protected boolean isClientPartitionAwarenessEnabled() {
+ return partitionAware;
}
/** */
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTestAsync.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTestAsync.java
deleted file mode 100644
index f9d7d2631c5..00000000000
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTestAsync.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client;
-
-import java.util.concurrent.ExecutionException;
-
-/**
- * Reliability test with async cache operation.
- */
-public class ReliabilityTestAsync extends ReliabilityTest {
- /** {@inheritDoc} */
- @Override protected <K, V> void cachePut(ClientCache<K, V> cache, K key, V val) {
- try {
- cache.putAsync(key, val).get();
- }
- catch (InterruptedException | ExecutionException e) {
- if (e.getCause() instanceof RuntimeException)
- throw (RuntimeException)e.getCause();
-
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTestPartitionAware.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTestPartitionAware.java
deleted file mode 100644
index ebfacea8751..00000000000
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTestPartitionAware.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client;
-
-import org.apache.ignite.configuration.ClientConfiguration;
-
-/**
- * Reliability test with partition awareness.
- */
-public class ReliabilityTestPartitionAware extends ReliabilityTest {
- /** {@inheritDoc} */
- @Override protected ClientConfiguration getClientConfiguration() {
- // In partition-aware mode we connect all channels right away.
- // Otherwise, we connect one channel at a time.
- return super.getClientConfiguration().setPartitionAwarenessEnabled(true);
- }
-
- /** {@inheritDoc} */
- @Override protected boolean isPartitionAware() {
- return true;
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTestPartitionAwareAsync.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTestPartitionAwareAsync.java
deleted file mode 100644
index a49cf5a339f..00000000000
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTestPartitionAwareAsync.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client;
-
-import org.apache.ignite.configuration.ClientConfiguration;
-
-/**
- * Reliability test with partition awareness and async operation.
- */
-public class ReliabilityTestPartitionAwareAsync extends ReliabilityTestAsync {
- /** {@inheritDoc} */
- @Override protected ClientConfiguration getClientConfiguration() {
- return super.getClientConfiguration().setPartitionAwarenessEnabled(true);
- }
-
- /** {@inheritDoc} */
- @Override protected boolean isPartitionAware() {
- return true;
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
index e9ac0e6d984..f3cc4f01949 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/AbstractThinClientTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.thin;
import java.util.Arrays;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
+import org.apache.ignite.client.ClientAddressFinder;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.ClientConfiguration;
@@ -43,8 +44,8 @@ public abstract class AbstractThinClientTest extends GridCommonAbstractTest {
log.setLevel(Level.ALL);
return new ClientConfiguration()
- .setPartitionAwarenessEnabled(false)
- .setLogger(log);
+ .setPartitionAwarenessEnabled(isClientPartitionAwarenessEnabled())
+ .setLogger(log);
}
/**
@@ -61,7 +62,10 @@ public abstract class AbstractThinClientTest extends GridCommonAbstractTest {
addrs[i] = clientHost(node) + ":" + clientPort(node);
}
- return getClientConfiguration().setAddresses(addrs);
+ if (isClientEndpointsDiscoveryEnabled())
+ return getClientConfiguration().setAddresses(addrs);
+ else
+ return getClientConfiguration().setAddressesFinder(new StaticAddressFinder(addrs));
}
/**
@@ -145,4 +149,36 @@ public abstract class AbstractThinClientTest extends GridCommonAbstractTest {
for (Ignite ignite : G.allGrids())
dropAllThinClientConnections(ignite);
}
+
+ /**
+ * Toggles endpoints discovery feature on or off.
+ */
+ protected boolean isClientEndpointsDiscoveryEnabled() {
+ return true;
+ }
+
+ /**
+ * Toggles partition awareness feature on or off.
+ */
+ protected boolean isClientPartitionAwarenessEnabled() {
+ return true;
+ }
+
+ /**
+ * Address finder with static set of addresses, used to disable endpoints discovery.
+ */
+ public static class StaticAddressFinder implements ClientAddressFinder {
+ /** */
+ private final String[] addrs;
+
+ /** */
+ public StaticAddressFinder(String... addrs) {
+ this.addrs = addrs.clone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String[] getAddresses() {
+ return addrs.clone();
+ }
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java
index 49496bca567..e654b312e37 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/CacheEntryListenersTest.java
@@ -70,6 +70,12 @@ public class CacheEntryListenersTest extends AbstractThinClientTest {
/** Timeout. */
private static final long TIMEOUT = 1_000L;
+ /** */
+ private boolean enpointsDiscoveryEnabled = true;
+
+ /** */
+ private boolean partitionAwarenessEnabled = true;
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
@@ -77,6 +83,12 @@ public class CacheEntryListenersTest extends AbstractThinClientTest {
startGrids(3);
}
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ enpointsDiscoveryEnabled = true;
+ partitionAwarenessEnabled = true;
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName).setClientConnectorConfiguration(
@@ -84,6 +96,16 @@ public class CacheEntryListenersTest extends AbstractThinClientTest {
new ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(100)));
}
+ /** {@inheritDoc} */
+ @Override protected boolean isClientEndpointsDiscoveryEnabled() {
+ return enpointsDiscoveryEnabled;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isClientPartitionAwarenessEnabled() {
+ return partitionAwarenessEnabled;
+ }
+
/** Test continuous queries. */
@Test
public void testContinuousQueries() throws Exception {
@@ -199,6 +221,9 @@ public class CacheEntryListenersTest extends AbstractThinClientTest {
/** Test continuous queries with page size parameter. */
@Test
public void testContinuousQueriesWithPageSize() throws Exception {
+ // It's required to connect exactly to nodes 1 and 2, without node 0.
+ enpointsDiscoveryEnabled = false;
+
try (IgniteClient client = startClient(1, 2)) {
ClientCache<Integer, Integer> cache = client.getOrCreateCache("testCQWithPageSize");
IgniteCache<Integer, Integer> nodeCache = grid(0).getOrCreateCache(cache.getName());
@@ -230,6 +255,9 @@ public class CacheEntryListenersTest extends AbstractThinClientTest {
/** Test continuous queries with time interval parameter. */
@Test
public void testContinuousQueriesWithTimeInterval() throws Exception {
+ // It's required to connect exactly to nodes 1 and 2, without node 0.
+ enpointsDiscoveryEnabled = false;
+
try (IgniteClient client = startClient(1, 2)) {
ClientCache<Integer, Integer> cache = client.getOrCreateCache("testCQWithTimeInterval");
IgniteCache<Integer, Integer> nodeCache = grid(0).getOrCreateCache(cache.getName());
@@ -602,6 +630,8 @@ public class CacheEntryListenersTest extends AbstractThinClientTest {
/** */
@Test
public void testContinuousQueriesWithConcurrentCompute() throws Exception {
+ partitionAwarenessEnabled = false;
+
try (IgniteClient client = startClient(0, 1, 2)) {
int threadsCnt = 20;
int iterations = 50;
@@ -623,6 +653,10 @@ public class CacheEntryListenersTest extends AbstractThinClientTest {
QueryCursor<?> cur = cache.query(new ContinuousQuery<Integer, Integer>()
.setLocalListener(lsnr));
+ // This call and disable of PA are required to provide happens-before guaranties between
+ // listener start and entry put. Without this call, event, triggered by put, can be skipped.
+ cache.containsKey(0);
+
cache.put(i, i);
Future<T2<UUID, Set<UUID>>> fut = client.compute().executeAsync2(TestTask.class.getName(),
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
index 44b642525b8..0691435c617 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -80,6 +80,13 @@ public class ComputeTaskTest extends AbstractThinClientTest {
.setClientMode(getTestIgniteInstanceIndex(igniteInstanceName) == 3);
}
+ /** {@inheritDoc} */
+ @Override protected boolean isClientEndpointsDiscoveryEnabled() {
+ // In this test it's critical to connect to the nodes specified in startClient method,
+ // since node for task is checked and one of the nodes doesn't allow tasks execution.
+ return false;
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index 77038a09a7f..5b9c88e79d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -90,7 +90,8 @@ public class ReliableChannelTest {
assertEquals(ClientConnectorConfiguration.DFLT_PORT_RANGE + 1, rc.getChannelHolders().size());
- assertEquals(ClientConnectorConfiguration.DFLT_PORT, F.first(rc.getChannelHolders()).getAddress().getPort());
+ assertEquals(ClientConnectorConfiguration.DFLT_PORT,
+ F.first(F.first(rc.getChannelHolders()).getAddresses()).getPort());
assertEquals(0, rc.getCurrentChannelIndex());
}
@@ -118,7 +119,7 @@ public class ReliableChannelTest {
rc.channelsInit();
- usedChannels.add(rc.getChannelHolders().get(rc.getCurrentChannelIndex()).getAddress().toString());
+ usedChannels.add(F.first(rc.getChannelHolders().get(rc.getCurrentChannelIndex()).getAddresses()).toString());
}
return usedChannels;
@@ -143,7 +144,7 @@ public class ReliableChannelTest {
ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
Supplier<List<String>> holderAddresses = () -> rc.getChannelHolders().stream()
- .map(h -> h.getAddress().toString())
+ .map(h -> F.first(h.getAddresses()).toString())
.sorted()
.collect(Collectors.toList());
@@ -403,7 +404,7 @@ public class ReliableChannelTest {
/** {@inheritDoc} */
@Override public ProtocolContext protocolCtx() {
- return null;
+ return new ProtocolContext(ProtocolVersion.LATEST_VER, null);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index 9ffaa31a84a..0d0a2eadb54 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
@@ -52,7 +53,7 @@ import static org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_
@SuppressWarnings("rawtypes")
public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommonAbstractTest {
/** Wait timeout. */
- private static final long WAIT_TIMEOUT = 5_000L;
+ protected static final long WAIT_TIMEOUT = 5_000L;
/** Replicated cache name. */
protected static final String REPL_CACHE_NAME = "replicated_cache";
@@ -195,7 +196,7 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
protected void initClient(ClientConfiguration clientCfg, int... chIdxs) throws IgniteInterruptedCheckedException {
client = new TcpIgniteClient((cfg, hnd) -> {
try {
- log.info("Establishing connection to " + cfg.getAddress());
+ log.info("Establishing connection to " + cfg.getAddresses());
TcpClientChannel ch = new TestTcpClientChannel(cfg, hnd);
@@ -330,7 +331,7 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
this.cfg = cfg;
- int chIdx = cfg.getAddress().getPort() - DFLT_PORT;
+ int chIdx = F.first(cfg.getAddresses()).getPort() - DFLT_PORT;
channels[chIdx] = this;
@@ -343,8 +344,11 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
Function<PayloadInputChannel, T> payloadReader) throws ClientException {
T res = super.service(op, payloadWriter, payloadReader);
- // Store all operations except binary type registration in queue to check later.
- if (op != ClientOperation.REGISTER_BINARY_TYPE_NAME && op != ClientOperation.PUT_BINARY_TYPE)
+ // Store all operations except some implicit system ops in queue to check later.
+ if (op != ClientOperation.REGISTER_BINARY_TYPE_NAME
+ && op != ClientOperation.PUT_BINARY_TYPE
+ && op != ClientOperation.CLUSTER_GROUP_GET_NODE_ENDPOINTS
+ )
opsQueue.offer(new T2<>(this, op));
return res;
@@ -356,8 +360,11 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader)
throws ClientException {
- // Store all operations except binary type registration in queue to check later.
- if (op != ClientOperation.REGISTER_BINARY_TYPE_NAME && op != ClientOperation.PUT_BINARY_TYPE)
+ // Store all operations except some implicit system ops in queue to check later.
+ if (op != ClientOperation.REGISTER_BINARY_TYPE_NAME
+ && op != ClientOperation.PUT_BINARY_TYPE
+ && op != ClientOperation.CLUSTER_GROUP_GET_NODE_ENDPOINTS
+ )
opsQueue.offer(new T2<>(this, op));
return super.serviceAsync(op, payloadWriter, payloadReader);
@@ -379,7 +386,7 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
/** {@inheritDoc} */
@Override public String toString() {
- return cfg.getAddress().toString();
+ return cfg.getAddresses().toString();
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java
new file mode 100644
index 00000000000..d0849134e0a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Test endpoints discovery by thin client.
+ */
+public class ThinClientEnpointsDiscoveryTest extends ThinClientAbstractPartitionAwarenessTest {
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** */
+ @Test
+ public void testEndpointsDiscovery() throws Exception {
+ startGrids(3);
+
+ // Set only subset of nodes to connect, but wait for init of all nodes channels (other nodes should be discovered).
+ initClient(getClientConfiguration(0, 4), 0, 1, 2);
+
+ stopGrid(0);
+
+ detectTopologyChange();
+
+ // Address of stopped node removed.
+ assertTrue(GridTestUtils.waitForCondition(() -> channels[0].isClosed(), WAIT_TIMEOUT));
+
+ channels[0] = null;
+
+ startGrid(0);
+
+ startGrid(3);
+
+ detectTopologyChange();
+
+ // Addresses of new nodes discovered.
+ awaitChannelsInit(0, 3);
+ }
+
+ /** */
+ @Test
+ public void testDiscoveryAfterAllNodesFailed() throws Exception {
+ startGrids(2);
+
+ awaitPartitionMapExchange();
+
+ initClient(getClientConfiguration(0), 0, 1);
+
+ Integer key = primaryKey(grid(1).cache(PART_CACHE_NAME));
+
+ // Any request to cache through any channel to initialize cache's partitions map.
+ client.cache(PART_CACHE_NAME).get(0);
+
+ assertOpOnChannel(null, ClientOperation.CACHE_PARTITIONS);
+ assertOpOnChannel(null, ClientOperation.CACHE_GET);
+
+ stopGrid(0);
+
+ // Send request through channel 1 to ensure that channel 0 is closed due to discovered topology change
+ // (not by failure on channel 0).
+ client.cache(PART_CACHE_NAME).put(key, key);
+
+ assertOpOnChannel(channels[1], ClientOperation.CACHE_PUT);
+
+ assertTrue(GridTestUtils.waitForCondition(() -> channels[0].isClosed(), WAIT_TIMEOUT));
+
+ channels[0] = null;
+
+ // At this moment we know only address of node 1.
+ stopGrid(1);
+
+ try {
+ detectTopologyChange();
+
+ fail();
+ }
+ catch (ClientConnectionException ignore) {
+ // Expected.
+ }
+
+ startGrid(0);
+
+ // We should be able to connect to node 0 again.
+ detectTopologyChange();
+
+ awaitChannelsInit(0);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
index b94062b0988..087e1c6a3ba 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessStableTopologyTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.client.thin;
+import java.util.Arrays;
import java.util.function.Function;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -32,6 +33,7 @@ import org.apache.ignite.client.ClientIgniteSet;
import org.apache.ignite.client.ClientPartitionAwarenessMapper;
import org.apache.ignite.client.ClientPartitionAwarenessMapperFactory;
import org.apache.ignite.configuration.AtomicConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongEx;
import org.junit.Test;
@@ -60,6 +62,15 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
initClient(getClientConfiguration(1, 2, 3), 1, 2);
}
+ /** {@inheritDoc} */
+ @Override protected ClientConfiguration getClientConfiguration(int... nodeIdxs) {
+ ClientConfiguration cfg = super.getClientConfiguration(nodeIdxs);
+
+ // To cover more cases, we need undiscovered nodes for this test, so disable endpoints discovery
+ // by setting addresses finder.
+ return cfg.setAddressesFinder(cfg::getAddresses);
+ }
+
/**
* Test that partition awareness is not applicable for replicated cache.
*/
@@ -84,6 +95,8 @@ public class ThinClientPartitionAwarenessStableTopologyTest extends ThinClientAb
public void testPartitionedCustomAffinityCacheWithMapper() throws Exception {
client.close();
+ Arrays.fill(channels, null);
+
initClient(getClientConfiguration(1, 2, 3)
.setPartitionAwarenessMapperFactory(new ClientPartitionAwarenessMapperFactory() {
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientRequestEventListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientRequestEventListenerTest.java
index 1e704752b64..be5b2f26da4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientRequestEventListenerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientRequestEventListenerTest.java
@@ -59,22 +59,29 @@ public class IgniteClientRequestEventListenerTest extends AbstractThinClientTest
return super.getClientConfiguration()
.setEventListeners(new RequestEventListener() {
@Override public void onRequestStart(RequestStartEvent event) {
- if (event.operationCode() != ClientOperation.GET_BINARY_CONFIGURATION.code())
+ if (!isImplicitOperation(event.operationCode()))
evSet.put(event.getClass(), event);
}
@Override public void onRequestSuccess(RequestSuccessEvent event) {
- if (event.operationCode() != ClientOperation.GET_BINARY_CONFIGURATION.code())
+ if (!isImplicitOperation(event.operationCode()))
evSet.put(event.getClass(), event);
}
@Override public void onRequestFail(RequestFailEvent event) {
- if (event.operationCode() != ClientOperation.GET_BINARY_CONFIGURATION.code())
+ if (!isImplicitOperation(event.operationCode()))
evSet.put(event.getClass(), event);
}
});
}
+ /** */
+ private boolean isImplicitOperation(short code) {
+ return code == ClientOperation.GET_BINARY_CONFIGURATION.code()
+ || code == ClientOperation.CACHE_PARTITIONS.code()
+ || code == ClientOperation.CLUSTER_GROUP_GET_NODE_ENDPOINTS.code();
+ }
+
/** */
@Test
public void testQuerySuccessEvents() {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientListenerMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientListenerMetricsTest.java
index fa3e93758d1..b17309a08a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientListenerMetricsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/odbc/ClientListenerMetricsTest.java
@@ -163,9 +163,11 @@ public class ClientListenerMetricsTest extends GridCommonAbstractTest {
/** */
private static ClientConfiguration getClientConfiguration() {
return new ClientConfiguration()
- .setAddresses(Config.SERVER)
- .setSendBufferSize(0)
- .setReceiveBufferSize(0);
+ .setAddresses(Config.SERVER)
+ // When PA is enabled, async client channel init executes and spoils the metrics.
+ .setPartitionAwarenessEnabled(false)
+ .setSendBufferSize(0)
+ .setReceiveBufferSize(0);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
index 978e237253c..769c1776a22 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java
@@ -87,7 +87,9 @@ public class PerformanceStatisticsThinClientTest extends AbstractPerformanceStat
ignite.compute().localDeployTask(TestTask.class, TestTask.class.getClassLoader());
- thinClient = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER));
+ thinClient = Ignition.startClient(new ClientConfiguration()
+ // Disable endpoints discovery, required connection to exact one node (node 0).
+ .setAddressesFinder(() -> new String[] {Config.SERVER}));
}
/** {@inheritDoc} */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index a6f6ae7ee76..5c4c610f882 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTe
import org.apache.ignite.internal.client.thin.ReliableChannelTest;
import org.apache.ignite.internal.client.thin.ServicesBinaryArraysTests;
import org.apache.ignite.internal.client.thin.ServicesTest;
+import org.apache.ignite.internal.client.thin.ThinClientEnpointsDiscoveryTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessBalancingTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessDiscoveryTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResourceReleaseTest;
@@ -53,9 +54,6 @@ import org.junit.runners.Suite;
IgniteBinaryTest.class,
LoadTest.class,
ReliabilityTest.class,
- ReliabilityTestAsync.class,
- ReliabilityTestPartitionAware.class,
- ReliabilityTestPartitionAwareAsync.class,
SecurityTest.class,
FunctionalQueryTest.class,
IgniteBinaryQueryTest.class,
@@ -84,7 +82,8 @@ import org.junit.runners.Suite;
DataReplicationOperationsTest.class,
MetadataRegistrationTest.class,
IgniteClientConnectionEventListenerTest.class,
- IgniteClientRequestEventListenerTest.class
+ IgniteClientRequestEventListenerTest.class,
+ ThinClientEnpointsDiscoveryTest.class,
})
public class ClientTestSuite {
// No-op.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index f8a45c4a77c..f317401f752 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -1582,7 +1582,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
ignite().events().localListen(lsnr, EVT_SQL_QUERY_EXECUTION);
- ClientConfiguration cc = new ClientConfiguration().setAddresses(Config.SERVER);
+ ClientConfiguration cc = new ClientConfiguration().setAddressesFinder(() -> new String[] {Config.SERVER});
try (IgniteClient client = Ignition.startClient(cc)) {
client.query(new SqlFieldsQuery("create table TEST_TABLE(key int primary key, val int)"))
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
index 99ed62fdae9..4149acb8b72 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
@@ -487,14 +487,14 @@ class KillCommandsTests {
*/
public static void doTestCancelClientConnection(List<IgniteEx> srvs, BiConsumer<UUID, Long> cliCanceler) {
ClientConfiguration cfg = new ClientConfiguration()
- .setAddresses("127.0.0.1:" + srvs.get(0).localNode().attribute(CLIENT_LISTENER_PORT))
+ .setAddressesFinder(() -> new String[] {"127.0.0.1:" + srvs.get(0).localNode().attribute(CLIENT_LISTENER_PORT)})
.setPartitionAwarenessEnabled(false);
IgniteClient cli0 = Ignition.startClient(cfg);
IgniteClient cli1 = Ignition.startClient(cfg);
IgniteClient cli2 = Ignition.startClient(cfg);
IgniteClient cli3 = Ignition.startClient(new ClientConfiguration()
- .setAddresses("127.0.0.1:" + srvs.get(1).localNode().attribute(CLIENT_LISTENER_PORT))
+ .setAddressesFinder(() -> new String[] {"127.0.0.1:" + srvs.get(1).localNode().attribute(CLIENT_LISTENER_PORT)})
.setPartitionAwarenessEnabled(false));
assertEquals(ClusterState.ACTIVE, cli0.cluster().state());
diff --git a/modules/kubernetes/src/test/java/org/apache/ignite/kubernetes/discovery/TestClusterClientConnection.java b/modules/kubernetes/src/test/java/org/apache/ignite/kubernetes/discovery/TestClusterClientConnection.java
index b70c1b88b4d..28de2644e77 100644
--- a/modules/kubernetes/src/test/java/org/apache/ignite/kubernetes/discovery/TestClusterClientConnection.java
+++ b/modules/kubernetes/src/test/java/org/apache/ignite/kubernetes/discovery/TestClusterClientConnection.java
@@ -18,6 +18,7 @@
package org.apache.ignite.kubernetes.discovery;
import org.apache.ignite.Ignition;
+import org.apache.ignite.client.ClientAddressFinder;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.ThinClientKubernetesAddressFinder;
@@ -67,7 +68,9 @@ public class TestClusterClientConnection extends KubernetesDiscoveryAbstractTest
mockServerResponse(crdAddr);
ClientConfiguration ccfg = new ClientConfiguration();
- ccfg.setAddressesFinder(new ThinClientKubernetesAddressFinder(prepareConfiguration()));
+ DelegatingAddressFinder addrFinder = new DelegatingAddressFinder();
+ ccfg.setAddressesFinder(addrFinder);
+ addrFinder.delegate = new ThinClientKubernetesAddressFinder(prepareConfiguration());
IgniteClient client = Ignition.startClient(ccfg);
ClientCache cache = client.createCache("cache");
@@ -77,7 +80,7 @@ public class TestClusterClientConnection extends KubernetesDiscoveryAbstractTest
// Stop node and change port => still can connect.
Ignition.stop(crd.name(), true);
int newPort = 10801;
- ccfg.setAddressesFinder(new ThinClientKubernetesAddressFinder(prepareConfiguration(), newPort));
+ addrFinder.delegate = new ThinClientKubernetesAddressFinder(prepareConfiguration(), newPort);
mockServerResponse(5, crdAddr);
cfg = getConfiguration(getTestIgniteInstanceName(), false);
@@ -87,6 +90,8 @@ public class TestClusterClientConnection extends KubernetesDiscoveryAbstractTest
try {
cache = client.getOrCreateCache("cache");
cache.put(1, 3);
+
+ fail();
}
catch (Exception ignored) {
// No-op.
@@ -96,4 +101,15 @@ public class TestClusterClientConnection extends KubernetesDiscoveryAbstractTest
cache.put(1, 3);
assertEquals(3, cache.get(1));
}
+
+ /** */
+ private static class DelegatingAddressFinder implements ClientAddressFinder {
+ /** */
+ private ClientAddressFinder delegate;
+
+ /** {@inheritDoc} */
+ @Override public String[] getAddresses() {
+ return delegate.getAddresses();
+ }
+ }
}