You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/09/22 13:19:08 UTC
[ignite-3] branch main updated: IGNITE-17725 Implement Partition Awareness in Java client (#1112)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 95c2881604 IGNITE-17725 Implement Partition Awareness in Java client (#1112)
95c2881604 is described below
commit 95c28816040ad9fb3ed95d9f43e0ca0715887fa1
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Thu Sep 22 16:19:03 2022 +0300
IGNITE-17725 Implement Partition Awareness in Java client (#1112)
* Add `PARTITION_ASSIGNMENT_GET` operation to the protocol.
* Add flags to every server -> client response. First flag indicates changed partition assignment.
* Add partition awareness to `ClientRecordBinaryView.get` (other APIs to be added in IGNITE-17739).
https://cwiki.apache.org/confluence/display/IGNITE/IEP-95+Client+Partition+Awareness
---
.../ignite/internal/client/proto/ClientOp.java | 3 +
.../internal/client/proto/ResponseFlags.java | 52 +++++
.../ignite/client/handler/ItClientHandlerTest.java | 6 +-
.../ignite/client/handler/ClientHandlerModule.java | 6 +-
.../handler/ClientInboundMessageHandler.java | 29 ++-
.../handler/requests/table/ClientTableCommon.java | 6 +-
.../ClientTablePartitionAssignmentGetRequest.java | 62 ++++++
modules/client/README.md | 1 -
.../ignite/internal/client/ClientChannel.java | 8 +
.../apache/ignite/internal/client/ClientUtils.java | 3 +
.../ignite/internal/client/ReliableChannel.java | 95 +++++---
.../ignite/internal/client/TcpClientChannel.java | 20 ++
.../internal/client/compute/ClientCompute.java | 2 +-
.../ignite/internal/client/table/ClientColumn.java | 32 ++-
.../client/table/ClientRecordBinaryView.java | 23 +-
.../ignite/internal/client/table/ClientSchema.java | 19 ++
.../ignite/internal/client/table/ClientTable.java | 78 ++++++-
.../apache/ignite/client/AbstractClientTest.java | 32 ++-
.../org/apache/ignite/client/ClientTupleTest.java | 84 +++----
.../ignite/client/PartitionAwarenessTest.java | 244 +++++++++++++++++++++
.../org/apache/ignite/client/RetryPolicyTest.java | 2 +-
.../ignite/client/TestClientHandlerModule.java | 3 +-
.../java/org/apache/ignite/client/TestServer.java | 35 ++-
.../org/apache/ignite/client/fakes/FakeIgnite.java | 44 +++-
.../ignite/client/fakes/FakeIgniteTables.java | 113 +++++++++-
.../ignite/client/fakes/FakeInternalTable.java | 49 +++++
.../Compute/ComputeClusterAwarenessTests.cs | 2 +-
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 1 +
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 3 +
.../app/client/ItAbstractThinClientTest.java | 8 +-
.../internal/table/IgniteTablesInternal.java | 30 ++-
.../internal/table/distributed/TableManager.java | 36 ++-
.../distributed/storage/InternalTableImpl.java | 2 +-
33 files changed, 1012 insertions(+), 121 deletions(-)
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index a3a42d3923..35f01e0cd2 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -137,4 +137,7 @@ public class ClientOp {
/** Close cursor. */
public static final int SQL_CURSOR_CLOSE = 52;
+
+ /** Close cursor. */
+ public static final int PARTITION_ASSIGNMENT_GET = 53;
}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ResponseFlags.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ResponseFlags.java
new file mode 100644
index 0000000000..308bb2cad4
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ResponseFlags.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.proto;
+
+/**
+ * Response flags utils.
+ */
+public class ResponseFlags {
+ /** Partitions assignment flag. */
+ private static final int PARTITION_ASSIGNMENT_FLAG = 1;
+
+ /**
+ * Gets flags as int.
+ *
+ * @param partitionAssignmentChanged Assignment changed flag.
+ * @return Flags as int.
+ */
+ public static int getFlags(boolean partitionAssignmentChanged) {
+ var flags = 0;
+
+ if (partitionAssignmentChanged) {
+ flags |= PARTITION_ASSIGNMENT_FLAG;
+ }
+
+ return flags;
+ }
+
+ /**
+ * Gets partition assignment flag value.
+ *
+ * @param flags Flags.
+ * @return Whether partition assignment has changed.
+ */
+ public static boolean getPartitionAssignmentChangedFlag(int flags) {
+ return (flags & PARTITION_ASSIGNMENT_FLAG) == PARTITION_ASSIGNMENT_FLAG;
+ }
+}
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 4c45f98aae..b9f5e296ed 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -41,10 +41,10 @@ import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -229,8 +229,8 @@ public class ItClientHandlerTest {
Mockito.when(clusterService.topologyService().localMember().id()).thenReturn("id");
Mockito.when(clusterService.topologyService().localMember().name()).thenReturn("consistent-id");
- var module = new ClientHandlerModule(mock(QueryProcessor.class), mock(IgniteTables.class), mock(IgniteTransactions.class), registry,
- mock(IgniteCompute.class), clusterService, bootstrapFactory, mock(IgniteSql.class));
+ var module = new ClientHandlerModule(mock(QueryProcessor.class), mock(IgniteTablesInternal.class), mock(IgniteTransactions.class),
+ registry, mock(IgniteCompute.class), clusterService, bootstrapFactory, mock(IgniteSql.class));
module.start();
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 1369c1f015..723dcfc302 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -38,12 +38,12 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
/**
@@ -57,7 +57,7 @@ public class ClientHandlerModule implements IgniteComponent {
private final ConfigurationRegistry registry;
/** Ignite tables API. */
- private final IgniteTables igniteTables;
+ private final IgniteTablesInternal igniteTables;
/** Ignite transactions API. */
private final IgniteTransactions igniteTransactions;
@@ -93,7 +93,7 @@ public class ClientHandlerModule implements IgniteComponent {
*/
public ClientHandlerModule(
QueryProcessor queryProcessor,
- IgniteTables igniteTables,
+ IgniteTablesInternal igniteTables,
IgniteTransactions igniteTransactions,
ConfigurationRegistry registry,
IgniteCompute igniteCompute,
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 01710eafa3..5876e6e283 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.BitSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.client.handler.requests.cluster.ClientClusterGetNodesRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteColocatedRequest;
import org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest;
@@ -48,6 +49,7 @@ import org.apache.ignite.client.handler.requests.sql.ClientSqlCursorNextPageRequ
import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteRequest;
import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTablePartitionAssignmentGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTablesGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleContainsKeyRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteAllExactRequest;
@@ -75,19 +77,20 @@ import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
+import org.apache.ignite.internal.client.proto.ResponseFlags;
import org.apache.ignite.internal.client.proto.ServerMessageType;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
/**
@@ -99,7 +102,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
private static final IgniteLogger LOG = Loggers.forClass(ClientInboundMessageHandler.class);
/** Ignite tables API. */
- private final IgniteTables igniteTables;
+ private final IgniteTablesInternal igniteTables;
/** Ignite tables API. */
private final IgniteTransactions igniteTransactions;
@@ -128,6 +131,9 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
/** Context. */
private ClientContext clientContext;
+ /** Whether the partition assignment has changed since the last server response. */
+ private final AtomicBoolean partitionAssignmentChanged = new AtomicBoolean();
+
/**
* Constructor.
*
@@ -139,7 +145,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
* @param clusterService Cluster.
*/
public ClientInboundMessageHandler(
- IgniteTables igniteTables,
+ IgniteTablesInternal igniteTables,
IgniteTransactions igniteTransactions,
QueryProcessor processor,
ClientConnectorView configuration,
@@ -163,6 +169,8 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(processor, new JdbcMetadataCatalog(igniteTables), resources);
jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
+
+ igniteTables.addAssignmentsChangeListener(this::onPartitionAssignmentChanged);
}
/** {@inheritDoc} */
@@ -185,6 +193,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
resources.close();
+ igniteTables.removeAssignmentsChangeListener(this::onPartitionAssignmentChanged);
super.channelInactive(ctx);
}
@@ -263,6 +272,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
packer.packInt(ServerMessageType.RESPONSE);
packer.packLong(requestId);
+ writeFlags(packer);
writeErrorCore(err, packer);
@@ -320,6 +330,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
out.packInt(ServerMessageType.RESPONSE);
out.packLong(requestId);
+ writeFlags(out);
out.packNil(); // No error.
var fut = processOperation(in, out, opCode);
@@ -469,11 +480,23 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
case ClientOp.SQL_CURSOR_CLOSE:
return ClientSqlCursorCloseRequest.process(in, resources);
+ case ClientOp.PARTITION_ASSIGNMENT_GET:
+ return ClientTablePartitionAssignmentGetRequest.process(in, out, igniteTables);
+
default:
throw new IgniteException(PROTOCOL_ERR, "Unexpected operation code: " + opCode);
}
}
+ private void writeFlags(ClientMessagePacker out) {
+ var flags = ResponseFlags.getFlags(partitionAssignmentChanged.compareAndSet(true, false));
+ out.packInt(flags);
+ }
+
+ private void onPartitionAssignmentChanged(IgniteTablesInternal tables) {
+ partitionAssignmentChanged.set(true);
+ }
+
/** {@inheritDoc} */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index dd45979d68..9f9b680d73 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -30,6 +30,7 @@ import java.time.LocalTime;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
+import java.util.Set;
import java.util.UUID;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
@@ -80,14 +81,17 @@ public class ClientTableCommon {
var colCnt = schema.columnNames().size();
packer.packArrayHeader(colCnt);
+ var colocationCols = Set.of(schema.colocationColumns());
+
for (var colIdx = 0; colIdx < colCnt; colIdx++) {
var col = schema.column(colIdx);
- packer.packArrayHeader(5);
+ packer.packArrayHeader(6);
packer.packString(col.name());
packer.packInt(getClientDataType(col.type().spec()));
packer.packBoolean(schema.isKeyColumn(colIdx));
packer.packBoolean(col.nullable());
+ packer.packBoolean(colocationCols.contains(col));
packer.packInt(getDecimalScale(col.type()));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablePartitionAssignmentGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablePartitionAssignmentGetRequest.java
new file mode 100644
index 0000000000..d40a74a650
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablePartitionAssignmentGetRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Client partition assignment retrieval request.
+ */
+public class ClientTablePartitionAssignmentGetRequest {
+ /**
+ * Processes the request.
+ *
+ * @param in Unpacker.
+ * @param out Packer.
+ * @param tables Ignite tables.
+ * @return Future.
+ * @throws IgniteException When schema registry is no initialized.
+ */
+ public static CompletableFuture<Object> process(
+ ClientMessageUnpacker in,
+ ClientMessagePacker out,
+ IgniteTablesInternal tables
+ ) throws NodeStoppingException {
+ UUID tableId = in.unpackUuid();
+ var assignment = tables.assignments(tableId);
+
+ if (assignment == null) {
+ out.packArrayHeader(0);
+ return null;
+ }
+
+ out.packArrayHeader(assignment.size());
+
+ for (String leaderNodeId : assignment) {
+ out.packString(leaderNodeId);
+ }
+
+ return null;
+ }
+}
diff --git a/modules/client/README.md b/modules/client/README.md
index 0e81cf86e5..8ad3de8ace 100644
--- a/modules/client/README.md
+++ b/modules/client/README.md
@@ -1,4 +1,3 @@
# Ignite Client module
This module contains ignite Java thin client.
-
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
index cffeda7470..46895e3ac1 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.client;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
/**
* Processing thin client requests and responses.
@@ -51,4 +52,11 @@ public interface ClientChannel extends AutoCloseable {
* @return Protocol context.
*/
public ProtocolContext protocolContext();
+
+ /**
+ * Add topology change listener.
+ *
+ * @param listener Listener.
+ */
+ public void addTopologyAssignmentChangeListener(Consumer<ClientChannel> listener);
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index 25a2ee3d8e..fea4e78e20 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -170,6 +170,9 @@ public class ClientUtils {
case ClientOp.SQL_CURSOR_CLOSE:
return null;
+ case ClientOp.PARTITION_ASSIGNMENT_GET:
+ return null;
+
// Do not return null from default arm intentionally, so we don't forget to update this when new ClientOp values are added.
default:
throw new UnsupportedOperationException("Invalid op code: " + opCode);
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 4912e3b65e..915bbaf2e1 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
@@ -69,7 +70,10 @@ public final class ReliableChannel implements AutoCloseable {
private final IgniteClientConfiguration clientCfg;
/** Node channels. */
- private final Map<String, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>();
+ private final Map<String, ClientChannelHolder> nodeChannelsByName = new ConcurrentHashMap<>();
+
+ /** Node channels. */
+ private final Map<String, ClientChannelHolder> nodeChannelsById = new ConcurrentHashMap<>();
/** Channels reinit was scheduled. */
private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
@@ -91,6 +95,10 @@ public final class ReliableChannel implements AutoCloseable {
/** Cache addresses returned by {@code ThinClientAddressFinder}. */
private volatile String[] prevHostAddrs;
+ /** Local topology assignment version. Instead of using event handlers to notify all tables about assignment change,
+ * the table will compare its version with channel version to detect an update. */
+ private final AtomicLong assignmentVersion = new AtomicLong();
+
/**
* Constructor.
*
@@ -131,7 +139,7 @@ public final class ReliableChannel implements AutoCloseable {
public List<ClusterNode> connections() {
List<ClusterNode> res = new ArrayList<>(channels.size());
- for (var holder : nodeChannels.values()) {
+ for (var holder : nodeChannelsByName.values()) {
var ch = holder.ch;
if (ch != null) {
@@ -151,18 +159,21 @@ public final class ReliableChannel implements AutoCloseable {
* @param <T> response type.
* @param preferredNodeName Unique name (consistent id) of the preferred target node. When a connection to the specified node exists,
* it will be used to handle the request; otherwise, default connection will be used.
+ * @param preferredNodeId ID of the preferred target node. When a connection to the specified node exists,
+ * it will be used to handle the request; otherwise, default connection will be used.
* @return Future for the operation.
*/
public <T> CompletableFuture<T> serviceAsync(
int opCode,
PayloadWriter payloadWriter,
PayloadReader<T> payloadReader,
- String preferredNodeName
+ String preferredNodeName,
+ String preferredNodeId
) {
CompletableFuture<T> fut = new CompletableFuture<>();
// Use the only one attempt to avoid blocking async method.
- handleServiceAsync(fut, opCode, payloadWriter, payloadReader, preferredNodeName, null, 0);
+ handleServiceAsync(fut, opCode, payloadWriter, payloadReader, preferredNodeName, preferredNodeId, null, 0);
return fut;
}
@@ -181,7 +192,7 @@ public final class ReliableChannel implements AutoCloseable {
PayloadWriter payloadWriter,
PayloadReader<T> payloadReader
) {
- return serviceAsync(opCode, payloadWriter, payloadReader, null);
+ return serviceAsync(opCode, payloadWriter, payloadReader, null, null);
}
/**
@@ -193,7 +204,7 @@ public final class ReliableChannel implements AutoCloseable {
* @return Future for the operation.
*/
public <T> CompletableFuture<T> serviceAsync(int opCode, PayloadReader<T> payloadReader) {
- return serviceAsync(opCode, null, payloadReader, null);
+ return serviceAsync(opCode, null, payloadReader, null, null);
}
private <T> void handleServiceAsync(final CompletableFuture<T> fut,
@@ -201,19 +212,23 @@ public final class ReliableChannel implements AutoCloseable {
PayloadWriter payloadWriter,
PayloadReader<T> payloadReader,
String preferredNodeName,
+ String preferredNodeId,
IgniteClientConnectionException failure,
int attempt) {
ClientChannel ch = null;
+ ClientChannelHolder holder = null;
if (preferredNodeName != null) {
- var holder = nodeChannels.get(preferredNodeName);
+ holder = nodeChannelsByName.get(preferredNodeName);
+ } else if (preferredNodeId != null) {
+ holder = nodeChannelsById.get(preferredNodeId);
+ }
- if (holder != null) {
- try {
- ch = holder.getOrCreateChannel();
- } catch (Throwable ignored) {
- // Ignore.
- }
+ if (holder != null) {
+ try {
+ ch = holder.getOrCreateChannel();
+ } catch (Throwable ignored) {
+ // Ignore.
}
}
@@ -274,7 +289,7 @@ public final class ReliableChannel implements AutoCloseable {
log.debug("Going to retry request because of error [opCode={}, currentAttempt={}, errMsg={}]",
failure0, opCode, attempt, failure0.getMessage());
- handleServiceAsync(fut, opCode, payloadWriter, payloadReader, null, failure0, attempt + 1);
+ handleServiceAsync(fut, opCode, payloadWriter, payloadReader, null, null, failure0, attempt + 1);
return null;
}
@@ -606,6 +621,25 @@ public final class ReliableChannel implements AutoCloseable {
);
}
+ private void onTopologyAssignmentChanged(ClientChannel clientChannel) {
+ // NOTE: Multiple channels will send the same update to us, resulting in multiple cache invalidations.
+ // This could be solved with a cluster-wide AssignmentVersion, but we don't have that.
+ // So we only react to updates from the default channel. When no user-initiated operations are performed on the default
+ // channel, heartbeat messages will trigger updates.
+ if (clientChannel == channels.get(curChIdx).ch) {
+ assignmentVersion.incrementAndGet();
+ }
+ }
+
+ /**
+ * Gets the local partition assignment version.
+ *
+ * @return Assignment version.
+ */
+ public long partitionAssignmentVersion() {
+ return assignmentVersion.get();
+ }
+
/**
* Channels holder.
*/
@@ -617,8 +651,8 @@ public final class ReliableChannel implements AutoCloseable {
/** Channel. */
private volatile ClientChannel ch;
- /** ID of the last server node that channel is or was connected to. */
- private volatile String serverNodeId;
+ /** The last server node that channel is or was connected to. */
+ private volatile ClusterNode serverNode;
/** Address that holder is bind to (chCfg.addr) is not in use now. So close the holder. */
private volatile boolean close;
@@ -691,18 +725,23 @@ public final class ReliableChannel implements AutoCloseable {
ch = chFactory.apply(chCfg, connMgr);
- String newNodeId = ch.protocolContext().clusterNode().name();
+ ch.addTopologyAssignmentChangeListener(ReliableChannel.this::onTopologyAssignmentChanged);
+
+ ClusterNode newNode = ch.protocolContext().clusterNode();
// There could be multiple holders map to the same serverNodeId if user provide the same
// address multiple times in configuration.
- nodeChannels.put(newNodeId, this);
+ nodeChannelsByName.put(newNode.name(), this);
+ nodeChannelsById.put(newNode.id(), this);
- if (serverNodeId != null && !serverNodeId.equals(newNodeId)) {
+ var oldServerNode = serverNode;
+ if (oldServerNode != null && !oldServerNode.id().equals(newNode.id())) {
// New node on the old address.
- nodeChannels.remove(serverNodeId, this);
+ nodeChannelsByName.remove(oldServerNode.name(), this);
+ nodeChannelsById.remove(oldServerNode.id(), this);
}
- serverNodeId = newNodeId;
+ serverNode = newNode;
}
}
@@ -720,8 +759,11 @@ public final class ReliableChannel implements AutoCloseable {
// No op.
}
- if (serverNodeId != null) {
- nodeChannels.remove(serverNodeId, this);
+ var oldServerNode = serverNode;
+
+ if (oldServerNode != null) {
+ nodeChannelsByName.remove(oldServerNode.name(), this);
+ nodeChannelsById.remove(oldServerNode.id(), this);
}
ch = null;
@@ -734,8 +776,11 @@ public final class ReliableChannel implements AutoCloseable {
void close() {
close = true;
- if (serverNodeId != null) {
- nodeChannels.remove(serverNodeId, this);
+ var oldServerNode = serverNode;
+
+ if (oldServerNode != null) {
+ nodeChannelsByName.remove(oldServerNode.name(), this);
+ nodeChannelsById.remove(oldServerNode.id(), this);
}
closeChannel();
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index c2dbf2a21a..fca061c674 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -34,11 +34,13 @@ import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.internal.client.io.ClientConnection;
import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
@@ -49,6 +51,7 @@ import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
+import org.apache.ignite.internal.client.proto.ResponseFlags;
import org.apache.ignite.internal.client.proto.ServerMessageType;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
@@ -82,6 +85,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/** Pending requests. */
private final Map<Long, ClientRequestFuture> pendingReqs = new ConcurrentHashMap<>();
+ /** Topology change listeners. */
+ private final Collection<Consumer<ClientChannel>> assignmentChangeListeners = new CopyOnWriteArrayList<>();
+
/** Closed flag. */
private final AtomicBoolean closed = new AtomicBoolean();
@@ -281,6 +287,14 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
throw new IgniteClientConnectionException(PROTOCOL_ERR, String.format("Unexpected response ID [%s]", resId));
}
+ int flags = unpacker.unpackInt();
+
+ if (ResponseFlags.getPartitionAssignmentChangedFlag(flags)) {
+ for (Consumer<ClientChannel> listener : assignmentChangeListeners) {
+ listener.accept(this);
+ }
+ }
+
if (unpacker.tryUnpackNil()) {
pendingReq.complete(unpacker);
} else {
@@ -334,6 +348,12 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
return protocolCtx;
}
+ /** {@inheritDoc} */
+ @Override
+ public void addTopologyAssignmentChangeListener(Consumer<ClientChannel> listener) {
+ assignmentChangeListeners.add(listener);
+ }
+
private static void validateConfiguration(ClientChannelConfiguration cfg) {
String error = null;
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index e1ad09db15..4ecbff6e8a 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -181,7 +181,7 @@ public class ClientCompute implements IgniteCompute {
w.out().packString(jobClassName);
w.out().packObjectArray(args);
- }, r -> (R) r.in().unpackObjectWithType(), node.name());
+ }, r -> (R) r.in().unpackObjectWithType(), node.name(), null);
}
private ClusterNode randomNode(Set<ClusterNode> nodes) {
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientColumn.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientColumn.java
index 2a1368c3f2..8c72d6653b 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientColumn.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientColumn.java
@@ -35,6 +35,9 @@ public class ClientColumn {
/** Key column flag. */
private final boolean isKey;
+ /** Key column flag. */
+ private final boolean isColocation;
+
/** Index of the column in the schema. */
private final int schemaIndex;
@@ -44,14 +47,15 @@ public class ClientColumn {
/**
* Constructor.
*
- * @param name Column name.
- * @param type Column type code.
- * @param nullable Nullable flag.
- * @param isKey Key column flag.
- * @param schemaIndex Index of the column in the schema.
+ * @param name Column name.
+ * @param type Column type code.
+ * @param nullable Nullable flag.
+ * @param isKey Key column flag.
+ * @param isColocation Colocation column flag.
+ * @param schemaIndex Index of the column in the schema.
*/
- public ClientColumn(String name, int type, boolean nullable, boolean isKey, int schemaIndex) {
- this(name, type, nullable, isKey, schemaIndex, 0);
+ public ClientColumn(String name, int type, boolean nullable, boolean isKey, boolean isColocation, int schemaIndex) {
+ this(name, type, nullable, isKey, isColocation, schemaIndex, 0);
}
/**
@@ -64,7 +68,7 @@ public class ClientColumn {
* @param schemaIndex Index of the column in the schema.
* @param scale Scale of the column, if applicable.
*/
- public ClientColumn(String name, int type, boolean nullable, boolean isKey, int schemaIndex, int scale) {
+ public ClientColumn(String name, int type, boolean nullable, boolean isKey, boolean isColocation, int schemaIndex, int scale) {
assert name != null;
assert schemaIndex >= 0;
@@ -72,6 +76,7 @@ public class ClientColumn {
this.type = type;
this.nullable = nullable;
this.isKey = isKey;
+ this.isColocation = isColocation;
this.schemaIndex = schemaIndex;
this.scale = scale;
}
@@ -101,12 +106,21 @@ public class ClientColumn {
/**
* Gets a value indicating whether this column is a part of key.
*
- * @return Value indicating whether this column is a part of key.
+ * @return Value indicating whether this column is a part of key.a part of key
*/
public boolean key() {
return isKey;
}
+ /**
+ * Gets a value indicating whether this column is a part of colocation key.
+ *
+ * @return Value indicating whether this column is a part of colocation key.
+ */
+ public boolean colocation() {
+ return isColocation;
+ }
+
/**
* Gets the index of the column in the schema.
*
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
index 0e208a387d..9751119b7f 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
@@ -25,8 +25,10 @@ import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.util.HashCalculator;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
@@ -70,7 +72,9 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
return tbl.doSchemaOutInOpAsync(
ClientOp.TUPLE_GET,
(s, w) -> ser.writeTuple(tx, keyRec, s, w, true),
- (s, r) -> ClientTupleSerializer.readValueTuple(s, r, keyRec));
+ (s, r) -> ClientTupleSerializer.readValueTuple(s, r, keyRec),
+ null,
+ getHashFunction(tx, keyRec));
}
/** {@inheritDoc} */
@@ -354,4 +358,21 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
) {
throw new UnsupportedOperationException("Not implemented yet.");
}
+
+ private Integer getColocationHash(ClientSchema schema, Tuple rec) {
+ var hashCalc = new HashCalculator();
+
+ for (ClientColumn col : schema.colocationColumns()) {
+ Object value = rec.valueOrDefault(col.name(), null);
+ hashCalc.append(value);
+ }
+
+ return hashCalc.hash();
+ }
+
+ @Nullable
+ private Function<ClientSchema, Integer> getHashFunction(@Nullable Transaction tx, @NotNull Tuple rec) {
+ // Disable partition awareness when transaction is used: tx belongs to a default connection.
+ return tx != null ? null : schema -> getColocationHash(schema, rec);
+ }
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
index 89fe3345b2..755b536a28 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
@@ -33,7 +33,9 @@ import static org.apache.ignite.internal.client.proto.ClientDataType.STRING;
import static org.apache.ignite.internal.client.proto.ClientDataType.TIME;
import static org.apache.ignite.internal.client.proto.ClientDataType.TIMESTAMP;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.ignite.internal.client.proto.ClientDataType;
import org.apache.ignite.internal.client.proto.TuplePart;
@@ -60,6 +62,9 @@ public class ClientSchema {
/** Columns. */
private final ClientColumn[] columns;
+ /** Colocation columns. */
+ private final List<ClientColumn> colocationColumns;
+
/** Columns map by name. */
private final Map<String, ClientColumn> map = new HashMap<>();
@@ -75,6 +80,7 @@ public class ClientSchema {
this.ver = ver;
this.columns = columns;
+ this.colocationColumns = new ArrayList<>();
var keyCnt = 0;
@@ -84,6 +90,10 @@ public class ClientSchema {
}
map.put(col.name(), col);
+
+ if (col.colocation()) {
+ colocationColumns.add(col);
+ }
}
keyColumnCount = keyCnt;
@@ -107,6 +117,15 @@ public class ClientSchema {
return columns;
}
+ /**
+ * Returns colocation columns.
+ *
+ * @return Colocation columns.
+ */
+ public List<ClientColumn> colocationColumns() {
+ return colocationColumns;
+ }
+
/**
* Gets a column by name.
*
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 2d1ec8da30..68ac843a7a 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.client.table;
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Common.UNKNOWN_ERR;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -61,6 +63,10 @@ public class ClientTable implements Table {
private final Object latestSchemaLock = new Object();
+ private volatile List<String> partitionAssignment = null;
+
+ private volatile long partitionAssignmentVersion = -1;
+
/**
* Constructor.
*
@@ -175,18 +181,19 @@ public class ClientTable implements Table {
for (int i = 0; i < colCnt; i++) {
var propCnt = in.unpackArrayHeader();
- assert propCnt >= 5;
+ assert propCnt >= 6;
var name = in.unpackString();
var type = in.unpackInt();
var isKey = in.unpackBoolean();
var isNullable = in.unpackBoolean();
+ var isColocation = in.unpackBoolean();
var scale = in.unpackInt();
// Skip unknown extra properties, if any.
- in.skipValues(propCnt - 5);
+ in.skipValues(propCnt - 6);
- var column = new ClientColumn(name, type, isNullable, isKey, i, scale);
+ var column = new ClientColumn(name, type, isNullable, isKey, isColocation, i, scale);
columns[i] = column;
}
@@ -262,6 +269,42 @@ public class ClientTable implements Table {
.thenCompose(t -> loadSchemaAndReadData(t, reader));
}
+ <T> CompletableFuture<T> doSchemaOutInOpAsync(
+ int opCode,
+ BiConsumer<ClientSchema, PayloadOutputChannel> writer,
+ BiFunction<ClientSchema, ClientMessageUnpacker, T> reader,
+ T defaultValue,
+ Function<ClientSchema, Integer> hashFunction
+ ) {
+ CompletableFuture<ClientSchema> schemaFut = getLatestSchema();
+ CompletableFuture<List<String>> partitionsFut = hashFunction == null
+ ? CompletableFuture.completedFuture(null)
+ : getPartitionAssignment();
+
+ return CompletableFuture.allOf(schemaFut, partitionsFut)
+ .thenCompose(v -> {
+ List<String> partitions = partitionsFut.getNow(null);
+ ClientSchema schema = schemaFut.getNow(null);
+
+ String preferredNodeId = null;
+
+ if (partitions != null && partitions.size() > 0 && hashFunction != null) {
+ Integer hash = hashFunction.apply(schema);
+
+ if (hash != null) {
+ preferredNodeId = partitions.get(Math.abs(hash % partitions.size()));
+ }
+ }
+
+ return ch.serviceAsync(opCode,
+ w -> writer.accept(schema, w),
+ r -> readSchemaAndReadData(schema, r.in(), reader, defaultValue),
+ null,
+ preferredNodeId);
+ })
+ .thenCompose(t -> loadSchemaAndReadData(t, reader));
+ }
+
/**
* Performs a schema-based operation.
*
@@ -331,4 +374,33 @@ public class ClientTable implements Table {
return resFut;
}
+
+ private CompletableFuture<List<String>> getPartitionAssignment() {
+ var cached = partitionAssignment;
+
+ if (cached != null && partitionAssignmentVersion == ch.partitionAssignmentVersion()) {
+ return CompletableFuture.completedFuture(cached);
+ }
+
+ return loadPartitionAssignment();
+ }
+
+ private CompletableFuture<List<String>> loadPartitionAssignment() {
+ partitionAssignmentVersion = ch.partitionAssignmentVersion();
+
+ return ch.serviceAsync(ClientOp.PARTITION_ASSIGNMENT_GET,
+ w -> w.out().packUuid(id),
+ r -> {
+ int cnt = r.in().unpackArrayHeader();
+ List<String> res = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ res.add(r.in().unpackString());
+ }
+
+ partitionAssignment = res;
+
+ return res;
+ });
+ }
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 2f8799f278..1f7ff411b4 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -49,7 +49,7 @@ public abstract class AbstractClientTest {
public static void beforeAll() {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
- server = new FakeIgnite();
+ server = new FakeIgnite("server-1");
testServer = startServer(10800, 10, 0, server);
@@ -72,8 +72,12 @@ public abstract class AbstractClientTest {
*/
@BeforeEach
public void beforeEach() {
- for (var t : server.tables().tables()) {
- server.tables().dropTable(t.name());
+ dropTables(server);
+ }
+
+ protected void dropTables(Ignite ignite) {
+ for (var t : ignite.tables().tables()) {
+ ignite.tables().dropTable(t.name());
}
}
@@ -108,7 +112,27 @@ public abstract class AbstractClientTest {
long idleTimeout,
Ignite ignite
) {
- return new TestServer(port, portRange, idleTimeout, ignite);
+ return startServer(port, portRange, idleTimeout, ignite, null);
+ }
+
+ /**
+ * Returns server.
+ *
+ * @param port Port.
+ * @param portRange Port range.
+ * @param idleTimeout Idle timeout.
+ * @param ignite Ignite.
+ * @param nodeName Node name.
+ * @return Server.
+ */
+ public static TestServer startServer(
+ int port,
+ int portRange,
+ long idleTimeout,
+ Ignite ignite,
+ String nodeName
+ ) {
+ return new TestServer(port, portRange, idleTimeout, ignite, null, nodeName);
}
/**
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java
index 3f432b8745..48ea253939 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientTupleTest.java
@@ -49,8 +49,8 @@ import org.junit.jupiter.api.Test;
*/
public class ClientTupleTest {
private static final ClientSchema SCHEMA = new ClientSchema(1, new ClientColumn[]{
- new ClientColumn("ID", ClientDataType.INT64, false, true, 0),
- new ClientColumn("NAME", ClientDataType.STRING, false, false, 1)
+ new ClientColumn("ID", ClientDataType.INT64, false, true, true, 0),
+ new ClientColumn("NAME", ClientDataType.STRING, false, false, false, 1)
});
@Test
@@ -139,19 +139,19 @@ public class ClientTupleTest {
@Test
public void testTypedGetters() {
var schema = new ClientSchema(100, new ClientColumn[]{
- new ClientColumn("I8", ClientDataType.INT8, false, false, 0),
- new ClientColumn("I16", ClientDataType.INT16, false, false, 1),
- new ClientColumn("I32", ClientDataType.INT32, false, false, 2),
- new ClientColumn("I64", ClientDataType.INT64, false, false, 3),
- new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, 4),
- new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, 5),
- new ClientColumn("UUID", ClientDataType.UUID, false, false, 6),
- new ClientColumn("STR", ClientDataType.STRING, false, false, 7),
- new ClientColumn("BITS", ClientDataType.BITMASK, false, false, 8),
- new ClientColumn("TIME", ClientDataType.TIME, false, false, 9),
- new ClientColumn("DATE", ClientDataType.DATE, false, false, 10),
- new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, 11),
- new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, 12)
+ new ClientColumn("I8", ClientDataType.INT8, false, false, false, 0),
+ new ClientColumn("I16", ClientDataType.INT16, false, false, false, 1),
+ new ClientColumn("I32", ClientDataType.INT32, false, false, false, 2),
+ new ClientColumn("I64", ClientDataType.INT64, false, false, false, 3),
+ new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, false, 4),
+ new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, false, 5),
+ new ClientColumn("UUID", ClientDataType.UUID, false, false, false, 6),
+ new ClientColumn("STR", ClientDataType.STRING, false, false, false, 7),
+ new ClientColumn("BITS", ClientDataType.BITMASK, false, false, false, 8),
+ new ClientColumn("TIME", ClientDataType.TIME, false, false, false, 9),
+ new ClientColumn("DATE", ClientDataType.DATE, false, false, false, 10),
+ new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, false, 11),
+ new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, false, 12)
});
var uuid = UUID.randomUUID();
@@ -219,7 +219,7 @@ public class ClientTupleTest {
assertEquals(tuple.hashCode(), tuple2.hashCode());
assertNotEquals(new ClientTuple(SCHEMA), new ClientTuple(new ClientSchema(1, new ClientColumn[]{
- new ClientColumn("id", ClientDataType.INT64, false, true, 0)})));
+ new ClientColumn("id", ClientDataType.INT64, false, true, true, 0)})));
assertEquals(new ClientTuple(SCHEMA).set("name", null), new ClientTuple(SCHEMA).set("name", null));
assertEquals(new ClientTuple(SCHEMA).set("name", null).hashCode(), new ClientTuple(SCHEMA).set("name", null).hashCode());
@@ -253,19 +253,19 @@ public class ClientTupleTest {
@Test
public void testTupleEquality() {
var schema = new ClientSchema(100, new ClientColumn[]{
- new ClientColumn("I8", ClientDataType.INT8, false, false, 0),
- new ClientColumn("I16", ClientDataType.INT16, false, false, 1),
- new ClientColumn("I32", ClientDataType.INT32, false, false, 2),
- new ClientColumn("I64", ClientDataType.INT64, false, false, 3),
- new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, 4),
- new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, 5),
- new ClientColumn("UUID", ClientDataType.UUID, false, false, 6),
- new ClientColumn("STR", ClientDataType.STRING, false, false, 7),
- new ClientColumn("BITS", ClientDataType.BITMASK, false, false, 8),
- new ClientColumn("TIME", ClientDataType.TIME, false, false, 9),
- new ClientColumn("DATE", ClientDataType.DATE, false, false, 10),
- new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, 11),
- new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, 12)
+ new ClientColumn("I8", ClientDataType.INT8, false, false, false, 0),
+ new ClientColumn("I16", ClientDataType.INT16, false, false, false, 1),
+ new ClientColumn("I32", ClientDataType.INT32, false, false, false, 2),
+ new ClientColumn("I64", ClientDataType.INT64, false, false, false, 3),
+ new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, false, 4),
+ new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, false, 5),
+ new ClientColumn("UUID", ClientDataType.UUID, false, false, false, 6),
+ new ClientColumn("STR", ClientDataType.STRING, false, false, false, 7),
+ new ClientColumn("BITS", ClientDataType.BITMASK, false, false, false, 8),
+ new ClientColumn("TIME", ClientDataType.TIME, false, false, false, 9),
+ new ClientColumn("DATE", ClientDataType.DATE, false, false, false, 10),
+ new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, false, 11),
+ new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, false, 12)
});
var uuid = UUID.randomUUID();
@@ -306,19 +306,19 @@ public class ClientTupleTest {
@Test
public void testTupleEqualityCompatibility() {
var schema = new ClientSchema(100, new ClientColumn[]{
- new ClientColumn("I8", ClientDataType.INT8, false, false, 0),
- new ClientColumn("I16", ClientDataType.INT16, false, false, 1),
- new ClientColumn("I32", ClientDataType.INT32, false, false, 2),
- new ClientColumn("I64", ClientDataType.INT64, false, false, 3),
- new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, 4),
- new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, 5),
- new ClientColumn("UUID", ClientDataType.UUID, false, false, 6),
- new ClientColumn("STR", ClientDataType.STRING, false, false, 7),
- new ClientColumn("BITS", ClientDataType.BITMASK, false, false, 8),
- new ClientColumn("TIME", ClientDataType.TIME, false, false, 9),
- new ClientColumn("DATE", ClientDataType.DATE, false, false, 10),
- new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, 11),
- new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, 12)
+ new ClientColumn("I8", ClientDataType.INT8, false, false, false, 0),
+ new ClientColumn("I16", ClientDataType.INT16, false, false, false, 1),
+ new ClientColumn("I32", ClientDataType.INT32, false, false, false, 2),
+ new ClientColumn("I64", ClientDataType.INT64, false, false, false, 3),
+ new ClientColumn("FLOAT", ClientDataType.FLOAT, false, false, false, 4),
+ new ClientColumn("DOUBLE", ClientDataType.DOUBLE, false, false, false, 5),
+ new ClientColumn("UUID", ClientDataType.UUID, false, false, false, 6),
+ new ClientColumn("STR", ClientDataType.STRING, false, false, false, 7),
+ new ClientColumn("BITS", ClientDataType.BITMASK, false, false, false, 8),
+ new ClientColumn("TIME", ClientDataType.TIME, false, false, false, 9),
+ new ClientColumn("DATE", ClientDataType.DATE, false, false, false, 10),
+ new ClientColumn("DATETIME", ClientDataType.DATETIME, false, false, false, 11),
+ new ClientColumn("TIMESTAMP", ClientDataType.TIMESTAMP, false, false, false, 12)
});
var uuid = UUID.randomUUID();
diff --git a/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
new file mode 100644
index 0000000000..371a63ccc6
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import io.netty.util.ResourceLeakDetector;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.client.fakes.FakeIgniteTables;
+import org.apache.ignite.client.fakes.FakeInternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests partition awareness.
+ */
+public class PartitionAwarenessTest extends AbstractClientTest {
+ protected static TestServer testServer2;
+
+ protected static Ignite server2;
+
+ protected static IgniteClient client2;
+
+ protected static int serverPort2;
+
+ private String lastOp;
+
+ private String lastOpServerName;
+
+ /**
+ * Before all.
+ */
+ @BeforeAll
+ public static void beforeAll() {
+ AbstractClientTest.beforeAll();
+
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+
+ server2 = new FakeIgnite("server-2");
+ testServer2 = startServer(10800, 10, 0, server2, "server-2");
+ serverPort2 = testServer2.port();
+
+ var clientBuilder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + serverPort, "127.0.0.1:" + serverPort2)
+ .heartbeatInterval(200);
+
+ client2 = clientBuilder.build();
+ }
+
+ /**
+ * After all.
+ */
+ @AfterAll
+ public static void afterAll() throws Exception {
+ AbstractClientTest.afterAll();
+
+ testServer2.close();
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ dropTables(server);
+ dropTables(server2);
+
+ initPartitionAssignment(null);
+ }
+
+ @Test
+ public void testGetRoutesRequestToPrimaryNode() {
+ RecordView<Tuple> recordView = defaultTable().recordView();
+
+ assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 0L)));
+ assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 1L)));
+ assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 2L)));
+ assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 3L)));
+ }
+
+ @Test
+ public void testNonNullTxDisablesPartitionAwareness() {
+ RecordView<Tuple> recordView = defaultTable().recordView();
+ var tx = client2.transactions().begin();
+
+ assertOpOnNode("server-2", "get", x -> recordView.get(tx, Tuple.create().set("ID", 0L)));
+ assertOpOnNode("server-2", "get", x -> recordView.get(tx, Tuple.create().set("ID", 1L)));
+ assertOpOnNode("server-2", "get", x -> recordView.get(tx, Tuple.create().set("ID", 2L)));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testClientReceivesPartitionAssignmentUpdates(boolean useHeartbeat) throws InterruptedException {
+ // Check default assignment.
+ RecordView<Tuple> recordView = defaultTable().recordView();
+
+ assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 0L)));
+ assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 1L)));
+
+ // Update partition assignment.
+ var assignments = new ArrayList<String>();
+
+ assignments.add(testServer2.nodeId());
+ assignments.add(testServer.nodeId());
+
+ initPartitionAssignment(assignments);
+
+ if (useHeartbeat) {
+ // Wait for heartbeat message to receive change notification flag.
+ Thread.sleep(500);
+ } else {
+ // Perform one request on the default channel to receive change notification flag.
+ client2.tables().tables();
+ }
+
+ // Check new assignment.
+ assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 0L)));
+ assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 1L)));
+ }
+
+ @Test
+ public void testCustomColocationKey() {
+ RecordView<Tuple> recordView = table(FakeIgniteTables.TABLE_COLOCATION_KEY).recordView();
+
+ // COLO-2 is nullable and not set.
+ assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 0).set("COLO-1", "0")));
+ assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 2).set("COLO-1", "0")));
+ assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 3).set("COLO-1", "0")));
+ assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 3).set("COLO-1", "1")));
+
+ // COLO-2 is set.
+ assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 0).set("COLO-1", "0").set("COLO-2", 1)));
+ assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 0).set("COLO-1", "0").set("COLO-2", 2)));
+ }
+
+ @Test
+ public void testCompositeKey() {
+ RecordView<Tuple> recordView = table(FakeIgniteTables.TABLE_COMPOSITE_KEY).recordView();
+
+ assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID1", 0).set("ID2", "0")));
+ assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID1", 1).set("ID2", "0")));
+ assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID1", 0).set("ID2", "1")));
+ assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID1", 1).set("ID2", "1")));
+ assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID1", 1).set("ID2", "2")));
+ }
+
+ @Test
+ public void testAllRecordViewOperations() {
+ // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+ }
+
+ @Test
+ public void testAllRecordBinaryViewOperations() {
+ // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+ }
+
+ @Test
+ public void testAllKeyValueViewOperations() {
+ // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+ }
+
+ @Test
+ public void testAllKeyValueBinaryViewOperations() {
+ // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+ }
+
+ private void assertOpOnNode(String expectedNode, String expectedOp, Consumer<Void> op) {
+ lastOpServerName = null;
+ lastOp = null;
+
+ op.accept(null);
+
+ assertEquals(expectedNode, lastOpServerName);
+ assertEquals(expectedOp, lastOp);
+ }
+
+ private Table defaultTable() {
+ return table(DEFAULT_TABLE);
+ }
+
+ private Table table(String name) {
+ // Create table on both servers with the same ID.
+ var tableId = UUID.randomUUID();
+
+ createTable(server, tableId, name);
+ createTable(server2, tableId, name);
+
+ return client2.tables().table(name);
+ }
+
+ private void createTable(Ignite ignite, UUID id, String name) {
+ FakeIgniteTables tables = (FakeIgniteTables) ignite.tables();
+ TableImpl tableImpl = tables.createTable(name, id);
+
+ ((FakeInternalTable) tableImpl.internalTable()).setDataAccessListener((op, data) -> {
+ lastOp = op;
+ lastOpServerName = ignite.name();
+ });
+ }
+
+ private void initPartitionAssignment(ArrayList<String> assignments) {
+ initPartitionAssignment(server, assignments);
+ initPartitionAssignment(server2, assignments);
+ }
+
+ private void initPartitionAssignment(Ignite ignite, ArrayList<String> assignments) {
+ if (assignments == null) {
+ assignments = new ArrayList<>();
+
+ assignments.add(testServer.nodeId());
+ assignments.add(testServer2.nodeId());
+ assignments.add(testServer.nodeId());
+ assignments.add(testServer2.nodeId());
+ }
+
+ FakeIgniteTables tables = (FakeIgniteTables) ignite.tables();
+
+ tables.setPartitionAssignments(assignments);
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index a40327aec0..8750bbbfeb 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -201,7 +201,7 @@ public class RetryPolicyTest {
}
}
- long expectedNullCount = 17;
+ long expectedNullCount = 18;
String msg = nullOpFields.size()
+ " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index 7d7c02b61c..1fbcb28e5c 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
@@ -159,7 +160,7 @@ public class TestClientHandlerModule implements IgniteComponent {
new ClientMessageDecoder(),
new ConnectionDropHandler(requestCounter, shouldDropConnection),
new ClientInboundMessageHandler(
- ignite.tables(),
+ (IgniteTablesInternal) ignite.tables(),
ignite.transactions(),
mock(QueryProcessor.class),
configuration,
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index e782e4de3f..7d397cee20 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -39,6 +39,7 @@ import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NettyBootstrapFactory;
@@ -55,6 +56,8 @@ public class TestServer implements AutoCloseable {
private final NettyBootstrapFactory bootstrapFactory;
+ private final String nodeName;
+
/**
* Constructor.
*
@@ -107,11 +110,13 @@ public class TestServer implements AutoCloseable {
bootstrapFactory.start();
if (nodeName == null) {
- nodeName = "consistent-id";
+ nodeName = "server-1";
}
+ this.nodeName = nodeName;
+
ClusterService clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
- Mockito.when(clusterService.topologyService().localMember().id()).thenReturn(nodeName + "-id");
+ Mockito.when(clusterService.topologyService().localMember().id()).thenReturn(getNodeId(nodeName));
Mockito.when(clusterService.topologyService().localMember().name()).thenReturn(nodeName);
Mockito.when(clusterService.topologyService().localMember()).thenReturn(getClusterNode(nodeName));
Mockito.when(clusterService.topologyService().getByConsistentId(anyString())).thenAnswer(
@@ -126,7 +131,7 @@ public class TestServer implements AutoCloseable {
? new TestClientHandlerModule(ignite, cfg, bootstrapFactory, shouldDropConnection, clusterService, compute)
: new ClientHandlerModule(
((FakeIgnite) ignite).queryEngine(),
- ignite.tables(),
+ (IgniteTablesInternal) ignite.tables(),
ignite.transactions(),
cfg,
compute,
@@ -151,6 +156,24 @@ public class TestServer implements AutoCloseable {
return ((InetSocketAddress) Objects.requireNonNull(addr)).getPort();
}
+ /**
+ * Gets the node name.
+ *
+ * @return Node name.
+ */
+ public String nodeName() {
+ return nodeName;
+ }
+
+ /**
+ * Gets the node name.
+ *
+ * @return Node name.
+ */
+ public String nodeId() {
+ return getNodeId(nodeName);
+ }
+
/** {@inheritDoc} */
@Override
public void close() throws Exception {
@@ -160,6 +183,10 @@ public class TestServer implements AutoCloseable {
}
private ClusterNode getClusterNode(String name) {
- return new ClusterNode(name + "-id", name, new NetworkAddress("127.0.0.1", 8080));
+ return new ClusterNode(getNodeId(name), name, new NetworkAddress("127.0.0.1", 8080));
+ }
+
+ private static String getNodeId(String name) {
+ return name + "-id";
}
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 3198aef18a..86a80a3315 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -18,26 +18,44 @@
package org.apache.ignite.client.fakes;
import java.util.Collection;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.NotNull;
/**
* Fake Ignite.
*/
public class FakeIgnite implements Ignite {
+ private final String name;
+
/**
* Default constructor.
*/
public FakeIgnite() {
+ this(null);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param name Name.
+ */
+ public FakeIgnite(String name) {
super();
+ this.name = name;
}
private final IgniteTables tables = new FakeIgniteTables();
@@ -68,7 +86,29 @@ public class FakeIgnite implements Ignite {
@Override
public CompletableFuture<Transaction> beginAsync() {
- return CompletableFuture.completedFuture(new Transaction() {
+ return CompletableFuture.completedFuture(new InternalTransaction() {
+ private final UUID id = UUID.randomUUID();
+
+ @Override
+ public @NotNull UUID id() {
+ return id;
+ }
+
+ @Override
+ public Set<RaftGroupService> enlisted() {
+ return null;
+ }
+
+ @Override
+ public TxState state() {
+ return null;
+ }
+
+ @Override
+ public boolean enlist(RaftGroupService svc) {
+ return false;
+ }
+
@Override
public void commit() throws TransactionException {
@@ -126,6 +166,6 @@ public class FakeIgnite implements Ignite {
/** {@inheritDoc} */
@Override
public String name() {
- return null;
+ return name;
}
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index 3b430425ca..f872113bf5 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -19,9 +19,11 @@ package org.apache.ignite.client.fakes;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.configuration.schemas.table.TableChange;
@@ -32,6 +34,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.manager.IgniteTables;
import org.jetbrains.annotations.NotNull;
@@ -48,6 +51,10 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
public static final String TABLE_WITH_DEFAULT_VALUES = "default-columns";
+ public static final String TABLE_COMPOSITE_KEY = "composite-key";
+
+ public static final String TABLE_COLOCATION_KEY = "colocation-key";
+
public static final String BAD_TABLE = "bad-table";
public static final String BAD_TABLE_ERR = "Err!";
@@ -56,10 +63,25 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
private final ConcurrentHashMap<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+ private final CopyOnWriteArrayList<Consumer<IgniteTablesInternal>> assignmentsChangeListeners = new CopyOnWriteArrayList<>();
+
+ private volatile List<String> partitionAssignments = null;
+
/** {@inheritDoc} */
@Override
public Table createTable(String name, Consumer<TableChange> tableInitChange) {
- var newTable = getNewTable(name);
+ return createTable(name, UUID.randomUUID());
+ }
+
+ /**
+ * Creates a table.
+ *
+ * @param name Table name.
+ * @param id Table id.
+ * @return Table.
+ */
+ public TableImpl createTable(String name, UUID id) {
+ var newTable = getNewTable(name, id);
var oldTable = tables.putIfAbsent(name, newTable);
@@ -159,8 +181,43 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
return CompletableFuture.completedFuture(tableImpl(name));
}
+ /** {@inheritDoc} */
+ @Override
+ public List<String> assignments(UUID tableId) throws NodeStoppingException {
+ return partitionAssignments;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void addAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener) {
+ Objects.requireNonNull(listener);
+
+ assignmentsChangeListeners.add(listener);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener) {
+ Objects.requireNonNull(listener);
+
+ return assignmentsChangeListeners.remove(listener);
+ }
+
+ /**
+ * Sets partition assignments.
+ *
+ * @param assignments Assignments.
+ */
+ public void setPartitionAssignments(List<String> assignments) {
+ partitionAssignments = assignments;
+
+ for (var listener : assignmentsChangeListeners) {
+ listener.accept(this);
+ }
+ }
+
@NotNull
- private TableImpl getNewTable(String name) {
+ private TableImpl getNewTable(String name, UUID id) {
Function<Integer, SchemaDescriptor> history;
switch (name) {
@@ -176,13 +233,21 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
history = this::getDefaultColumnValuesSchema;
break;
+ case TABLE_COMPOSITE_KEY:
+ history = this::getCompositeKeySchema;
+ break;
+
+ case TABLE_COLOCATION_KEY:
+ history = this::getColocationKeySchema;
+ break;
+
default:
history = this::getSchema;
break;
}
return new TableImpl(
- new FakeInternalTable(name, UUID.randomUUID()),
+ new FakeInternalTable(name, id),
new FakeSchemaRegistry(history)
);
}
@@ -266,6 +331,48 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
});
}
+ /**
+ * Gets the schema.
+ *
+ * @param v Version.
+ * @return Schema descriptor.
+ */
+ private SchemaDescriptor getCompositeKeySchema(Integer v) {
+ return new SchemaDescriptor(
+ v,
+ new Column[]{
+ new Column("ID1", NativeTypes.INT32, false),
+ new Column("ID2", NativeTypes.STRING, false)
+ },
+ new Column[]{
+ new Column("STR", NativeTypes.STRING, true)
+ });
+ }
+
+
+ /**
+ * Gets the schema.
+ *
+ * @param v Version.
+ * @return Schema descriptor.
+ */
+ private SchemaDescriptor getColocationKeySchema(Integer v) {
+ Column colocationCol1 = new Column("COLO-1", NativeTypes.STRING, false);
+ Column colocationCol2 = new Column("COLO-2", NativeTypes.INT64, true);
+
+ return new SchemaDescriptor(
+ v,
+ new Column[]{
+ new Column("ID", NativeTypes.INT32, false),
+ },
+ new String[]{ colocationCol1.name(), colocationCol2.name() },
+ new Column[]{
+ colocationCol1,
+ colocationCol2,
+ new Column("STR", NativeTypes.STRING, true)
+ });
+ }
+
/**
* Gets the schema.
*
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 70fe45f6ba..3c5f46226a 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow.Publisher;
+import java.util.function.BiConsumer;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -49,6 +50,9 @@ public class FakeInternalTable implements InternalTable {
/** Table data. */
private final ConcurrentHashMap<ByteBuffer, BinaryRow> data = new ConcurrentHashMap<>();
+ /** Data access listener. */
+ private BiConsumer<String, Object> dataAccessListener;
+
/**
* The constructor.
*
@@ -87,6 +91,8 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable InternalTransaction tx) {
+ onDataAccess("get", keyRow);
+
return CompletableFuture.completedFuture(data.get(keyRow.keySlice()));
}
@@ -94,6 +100,8 @@ public class FakeInternalTable implements InternalTable {
@Override
public CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows,
@Nullable InternalTransaction tx) {
+ onDataAccess("getAll", keyRows);
+
var res = new ArrayList<BinaryRow>();
for (var key : keyRows) {
@@ -110,6 +118,8 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable InternalTransaction tx) {
+ onDataAccess("upsert", row);
+
data.put(row.keySlice(), row);
return CompletableFuture.completedFuture(null);
@@ -118,6 +128,8 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
+ onDataAccess("upsertAll", rows);
+
for (var row : rows) {
upsert(row, tx);
}
@@ -129,6 +141,8 @@ public class FakeInternalTable implements InternalTable {
@Override
public CompletableFuture<BinaryRow> getAndUpsert(BinaryRowEx row,
@Nullable InternalTransaction tx) {
+ onDataAccess("getAndUpsert", row);
+
var res = get(row, tx);
upsert(row, tx);
@@ -139,6 +153,8 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> insert(BinaryRowEx row, @Nullable InternalTransaction tx) {
+ onDataAccess("insert", row);
+
var old = get(row, tx).getNow(null);
if (old == null) {
@@ -153,6 +169,8 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
+ onDataAccess("insertAll", rows);
+
var skipped = new ArrayList<BinaryRow>();
for (var row : rows) {
@@ -167,6 +185,8 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> replace(BinaryRowEx row, @Nullable InternalTransaction tx) {
+ onDataAccess("replace", row);
+
var old = get(row, tx).getNow(null);
if (old == null) {
@@ -179,6 +199,8 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> replace(BinaryRowEx oldRow, BinaryRowEx newRow, @Nullable InternalTransaction tx) {
+ onDataAccess("replace", oldRow);
+
var old = get(oldRow, tx).getNow(null);
if (old == null || !old.valueSlice().equals(oldRow.valueSlice())) {
@@ -192,6 +214,8 @@ public class FakeInternalTable implements InternalTable {
@Override
public CompletableFuture<BinaryRow> getAndReplace(BinaryRowEx row,
@Nullable InternalTransaction tx) {
+ onDataAccess("getAndReplace", row);
+
var old = get(row, tx);
return replace(row, tx).thenCompose(f -> old);
@@ -200,6 +224,8 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> delete(BinaryRowEx keyRow, @Nullable InternalTransaction tx) {
+ onDataAccess("delete", keyRow);
+
var old = get(keyRow, tx).getNow(null);
if (old != null) {
@@ -212,6 +238,8 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> deleteExact(BinaryRowEx oldRow, @Nullable InternalTransaction tx) {
+ onDataAccess("deleteExact", oldRow);
+
var old = get(oldRow, tx).getNow(null);
if (old != null && old.valueSlice().equals(oldRow.valueSlice())) {
@@ -226,6 +254,8 @@ public class FakeInternalTable implements InternalTable {
@Override
public CompletableFuture<BinaryRow> getAndDelete(BinaryRowEx row,
@Nullable InternalTransaction tx) {
+ onDataAccess("getAndDelete", row);
+
var old = get(row, tx).getNow(null);
if (old != null) {
@@ -238,6 +268,8 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
+ onDataAccess("deleteAll", rows);
+
var skipped = new ArrayList<BinaryRow>();
for (var row : rows) {
@@ -252,6 +284,8 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
+ onDataAccess("deleteAllExact", rows);
+
var skipped = new ArrayList<BinaryRow>();
for (var row : rows) {
@@ -298,4 +332,19 @@ public class FakeInternalTable implements InternalTable {
public void close() throws Exception {
// No-op.
}
+
+ /**
+ * Sets the data access operation listener.
+ *
+ * @param dataAccessListener Data access operation listener.
+ */
+ public void setDataAccessListener(BiConsumer<String, Object> dataAccessListener) {
+ this.dataAccessListener = dataAccessListener;
+ }
+
+ private void onDataAccess(String operation, Object arg) {
+ if (dataAccessListener != null) {
+ dataAccessListener.accept(operation, arg);
+ }
+ }
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
index 24220b2d40..bb94f710ac 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
@@ -93,7 +93,7 @@ namespace Apache.Ignite.Tests.Compute
using var client = await IgniteClient.StartAsync(clientCfg);
// ReSharper disable once AccessToDisposedClosure
- TestUtils.WaitForCondition(() => client.GetConnections().Count == 2);
+ TestUtils.WaitForCondition(() => client.GetConnections().Count == 2, 5000);
for (int i = 0; i < 100; i++)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 07d5b145c0..318612ff4e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -131,6 +131,7 @@ namespace Apache.Ignite.Tests
writer.Write(0); // Message type.
writer.Write(requestId);
+ writer.Write(0); // Flags.
if (!isError)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index c4d543b9ad..f3a2c1a0ca 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -544,6 +544,9 @@ namespace Apache.Ignite.Internal
return;
}
+ // Skip flags.
+ reader.ReadInt32();
+
var exception = ReadError(ref reader);
if (exception != null)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
index 8d1c95da44..37361d1223 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
@@ -82,7 +82,7 @@ public abstract class ItAbstractThinClientTest extends IgniteAbstractTest {
node0Name,
"{\n"
+ " network.port: 3344,\n"
- + " network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\" ]\n"
+ + " network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ "}"
);
@@ -90,7 +90,7 @@ public abstract class ItAbstractThinClientTest extends IgniteAbstractTest {
node1Name,
"{\n"
+ " network.port: 3345,\n"
- + " network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\" ]\n"
+ + " network.nodeFinder.netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n"
+ " clientConnector.sendServerExceptionStackTraceToClient: true\n"
+ "}"
);
@@ -170,6 +170,10 @@ public abstract class ItAbstractThinClientTest extends IgniteAbstractTest {
return client;
}
+ protected Ignite server() {
+ return startedNodes.get(0);
+ }
+
/**
* Test class.
*/
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java b/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
index 5237e4fa3b..5e243ea0ae 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
@@ -17,15 +17,18 @@
package org.apache.ignite.internal.table;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.table.manager.IgniteTables;
/**
* Internal tables facade provides low-level methods for table operations.
*/
-public interface IgniteTablesInternal {
+public interface IgniteTablesInternal extends IgniteTables {
/**
* Gets a table by id.
*
@@ -73,4 +76,29 @@ public interface IgniteTablesInternal {
* </ul>
*/
CompletableFuture<TableImpl> tableImplAsync(String name);
+
+ /**
+ * Gets a list of the current table assignments.
+ *
+ * <p>Returns a list where on the i-th place resides a node id that considered as a leader for
+ * the i-th partition on the moment of invocation.
+ *
+ * @param tableId Unique id of a table.
+ * @return List of the current assignments.
+ */
+ List<String> assignments(UUID tableId) throws NodeStoppingException;
+
+ /**
+ * Adds a listener to track changes in {@link #assignments(UUID)}.
+ *
+ * @param listener Listener.
+ */
+ void addAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener);
+
+ /**
+ * Removes assignments change listener.
+ *
+ * @param listener Listener.
+ */
+ boolean removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e85aca12cb..89033fa3cb 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -44,11 +44,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@@ -230,6 +232,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
*/
private final ExecutorService ioExecutor;
+ /** Assignment change event listeners. */
+ private final CopyOnWriteArrayList<Consumer<IgniteTablesInternal>> assignmentsChangeListeners = new CopyOnWriteArrayList<>();
+
/** Rebalance scheduler pool size. */
private static final int REBALANCE_SCHEDULER_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
@@ -549,6 +554,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
busyLock.leaveBusy();
}
+ for (var listener : assignmentsChangeListeners) {
+ listener.accept(this);
+ }
+
return completedFuture(null);
}
@@ -804,15 +813,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
}
- /**
- * Gets a list of the current table assignments.
- *
- * <p>Returns a list where on the i-th place resides a node id that considered as a leader for
- * the i-th partition on the moment of invocation.
- *
- * @param tableId Unique id of a table.
- * @return List of the current assignments.
- */
+ /** {@inheritDoc} */
+ @Override
public List<String> assignments(UUID tableId) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
@@ -824,6 +826,22 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
}
+ /** {@inheritDoc} */
+ @Override
+ public void addAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener) {
+ Objects.requireNonNull(listener);
+
+ assignmentsChangeListeners.add(listener);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean removeAssignmentsChangeListener(Consumer<IgniteTablesInternal> listener) {
+ Objects.requireNonNull(listener);
+
+ return assignmentsChangeListeners.remove(listener);
+ }
+
/**
* Creates local structures for a table.
*
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 47b166b86a..ec13f4f903 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -115,7 +115,7 @@ public class InternalTableImpl implements InternalTable {
private final MvTableStorage tableStorage;
/** Mutex for the partition map update. */
- public Object updatePartMapMux = new Object();
+ private final Object updatePartMapMux = new Object();
/**
* Constructor.