You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/07/23 14:24:08 UTC
[ignite-3] branch main updated: IGNITE-14970 Add basic Java thin
client
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0448b1d IGNITE-14970 Add basic Java thin client
0448b1d is described below
commit 0448b1d679beb32235cc200f26b5d6166590c7ab
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Fri Jul 23 17:23:58 2021 +0300
IGNITE-14970 Add basic Java thin client
Implement basic thin client protocol.
The goal of this change is to have a working connection and `Table#insert` + `Table#get` operations with tuples and tuple builders.
New modules:
* `ignite-client-common`: client/server shared logic.
* `ignite-client-handler`: server-side connection handler.
* `ignite-client`: Ignite thin client (depends on `ignite-api`, `ignite-client-common`, `ignite-core`).
Details:
* Use MsgPack for serialization, Netty for network.
* Reuse some of the client-side logic from 2.x: `TcpClientChannel`, `ReliableChannel`, `HostAndPortRange`.
https://cwiki.apache.org/confluence/display/IGNITE/IEP-76+Thin+Client+Protocol+for+Ignite+3.0
Co-authored-by: Andrew V. Mashenkov <AM...@users.noreply.github.com>
---
.../ClientConnectorConfigurationSchema.java | 42 ++
.../schemas/clientconnector/package-info.java | 21 +
modules/client-common/README.md | 4 +
modules/client-common/pom.xml | 115 ++++
.../apache/ignite/client/proto/ClientDataType.java | 56 ++
.../ignite/client/proto/ClientErrorCode.java | 32 ++
.../ignite/client/proto/ClientMessageDecoder.java | 146 +++++
.../ignite/client/proto/ClientMessageEncoder.java | 47 ++
.../ignite/client/proto/ClientMessagePacker.java | 125 ++++
.../ignite/client/proto/ClientMessageUnpacker.java | 155 +++++
.../ignite/client/proto/ClientMsgPackType.java | 41 ++
.../org/apache/ignite/client/proto/ClientOp.java | 47 ++
.../ignite/client/proto/ProtocolVersion.java | 141 +++++
.../ignite/client/proto/ServerMessageType.java | 29 +
.../apache/ignite/client/proto/package-info.java | 21 +
.../client/proto/ClientMessageDecoderTest.java | 109 ++++
.../client/proto/ClientMessageEncoderTest.java | 53 ++
.../proto/ClientMessagePackerUnpackerTest.java | 50 ++
modules/client-handler/README.md | 4 +
modules/client-handler/pom.xml | 138 +++++
.../ignite/client/handler/ClientContext.java | 82 +++
.../ignite/client/handler/ClientHandlerModule.java | 150 +++++
.../handler/ClientInboundMessageHandler.java | 493 ++++++++++++++++
.../apache/ignite/client/handler/package-info.java | 21 +
.../handler/ClientHandlerIntegrationTest.java | 195 +++++++
.../client/handler/TestConfigurationStorage.java | 75 +++
modules/client/README.md | 4 +
modules/client/pom.xml | 132 +++++
.../org/apache/ignite/client/IgniteClient.java | 120 ++++
.../ignite/client/IgniteClientAddressFinder.java | 33 ++
.../IgniteClientAuthenticationException.java | 45 ++
.../client/IgniteClientAuthorizationException.java | 55 ++
.../ignite/client/IgniteClientConfiguration.java | 57 ++
.../client/IgniteClientConnectionException.java | 45 ++
.../ignite/client/IgniteClientException.java | 96 ++++
...ClientFeatureNotSupportedByServerException.java | 56 ++
.../org/apache/ignite/client/package-info.java | 21 +
.../ignite/internal/client/ClientChannel.java | 52 ++
.../client/ClientChannelConfiguration.java | 113 ++++
.../ignite/internal/client/HostAndPortRange.java | 280 +++++++++
.../internal/client/PayloadInputChannel.java | 60 ++
.../internal/client/PayloadOutputChannel.java | 66 +++
.../ignite/internal/client/PayloadReader.java | 33 ++
.../ignite/internal/client/PayloadWriter.java | 32 ++
.../internal/client/ProtocolBitmaskFeature.java | 91 +++
.../ignite/internal/client/ProtocolContext.java | 78 +++
.../ignite/internal/client/ReliableChannel.java | 626 +++++++++++++++++++++
.../ignite/internal/client/TcpClientChannel.java | 391 +++++++++++++
.../ignite/internal/client/TcpIgniteClient.java | 96 ++++
.../internal/client/io/ClientConnection.java | 41 ++
.../client/io/ClientConnectionMultiplexer.java | 52 ++
.../client/io/ClientConnectionStateHandler.java | 32 ++
.../internal/client/io/ClientMessageHandler.java | 34 ++
.../client/io/netty/NettyClientConnection.java | 90 +++
.../io/netty/NettyClientConnectionMultiplexer.java | 96 ++++
.../client/io/netty/NettyClientMessageHandler.java | 41 ++
.../ignite/internal/client/table/ClientColumn.java | 99 ++++
.../ignite/internal/client/table/ClientSchema.java | 102 ++++
.../ignite/internal/client/table/ClientTable.java | 439 +++++++++++++++
.../ignite/internal/client/table/ClientTables.java | 102 ++++
.../internal/client/table/ClientTupleBuilder.java} | 61 +-
.../apache/ignite/client/AbstractClientTest.java | 110 ++++
.../org/apache/ignite/client/ClientTableTest.java | 87 +++
.../org/apache/ignite/client/ClientTablesTest.java | 103 ++++
.../org/apache/ignite/client/ConnectionTest.java | 70 +++
.../ignite/client/TestConfigurationStorage.java | 75 +++
.../org/apache/ignite/client/fakes/FakeIgnite.java | 45 ++
.../ignite/client/fakes/FakeIgniteTables.java | 133 +++++
.../ignite/client/fakes/FakeInternalTable.java | 144 +++++
.../internal/client/HostAndPortRangeTest.java | 174 ++++++
.../runner/app/LiveSchemaChangeKVViewTest.java | 7 +-
.../runner/app/LiveSchemaChangeTableTest.java | 13 +-
.../apache/ignite/internal/schema/SchemaAware.java | 30 +
.../internal/table/IgniteTablesInternal.java | 33 ++
.../ignite/internal/table/RowChunkAdapter.java | 3 +-
.../org/apache/ignite/internal/table/TableRow.java | 20 +-
.../ignite/internal/table/TupleBuilderImpl.java | 29 +-
.../internal/table/distributed/TableManager.java | 55 +-
parent/pom.xml | 25 +
pom.xml | 3 +
80 files changed, 7253 insertions(+), 69 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java
new file mode 100644
index 0000000..cdfa263
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.clientconnector;
+
+import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Max;
+import org.apache.ignite.configuration.validation.Min;
+
+/**
+ * Configuration schema for thin client connector.
+ */
+@SuppressWarnings("PMD.UnusedPrivateField")
+@ConfigurationRoot(rootName = "clientConnector", type = ConfigurationType.LOCAL)
+public class ClientConnectorConfigurationSchema {
+ /** TCP port. */
+ @Min(1024)
+ @Max(0xFFFF)
+ @Value(hasDefault = true)
+ public final int port = 10800;
+
+ /** TCP port range. */
+ @Min(0)
+ @Value(hasDefault = true)
+ public final int portRange = 0;
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/package-info.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/package-info.java
new file mode 100644
index 0000000..1fa2d80
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Configuration schemas for thin client components.
+ */
+package org.apache.ignite.configuration.schemas.clientconnector;
\ No newline at end of file
diff --git a/modules/client-common/README.md b/modules/client-common/README.md
new file mode 100644
index 0000000..1d4c6ed
--- /dev/null
+++ b/modules/client-common/README.md
@@ -0,0 +1,4 @@
+# Ignite Client common module
+
+This module contains shared thin client logic (mostly serialization), reused by client and server parts.
+
diff --git a/modules/client-common/pom.xml b/modules/client-common/pom.xml
new file mode 100644
index 0000000..054dbd8
--- /dev/null
+++ b/modules/client-common/pom.xml
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-client-common</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ </dependency>
+
+ <!-- 3rd party dependencies -->
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>jackson-dataformat-msgpack</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientDataType.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientDataType.java
new file mode 100644
index 0000000..45d1280
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientDataType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+/**
+ * Client data types.
+ */
+public class ClientDataType {
+ /** Byte. */
+ public static final int INT8 = 1;
+
+ /** Short. */
+ public static final int INT16 = 2;
+
+ /** Int. */
+ public static final int INT32 = 3;
+
+ /** Long. */
+ public static final int INT64 = 4;
+
+ /** Float. */
+ public static final int FLOAT = 5;
+
+ /** Double. */
+ public static final int DOUBLE = 6;
+
+ /** Decimal. */
+ public static final int DECIMAL = 7;
+
+ /** UUID. */
+ public static final int UUID = 8;
+
+ /** String. */
+ public static final int STRING = 9;
+
+ /** Byte array. */
+ public static final int BYTES = 10;
+
+ /** BitMask. */
+ public static final int BITMASK = 11;
+}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientErrorCode.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientErrorCode.java
new file mode 100644
index 0000000..9c1759a
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientErrorCode.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+/**
+ * Client error codes.
+ */
+public class ClientErrorCode {
+ /** Operation succeeded (no error). */
+ public static final int SUCCESS = 0;
+
+ /** General error (uncategorized). */
+ public static final int FAILED = 1;
+
+ /** Authentication or authorization failure. */
+ public static final int AUTH_FAILED = 2;
+}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageDecoder.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageDecoder.java
new file mode 100644
index 0000000..df42f40
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageDecoder.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.CharsetUtil;
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Decodes full client messages:
+ * 1. MAGIC for first message.
+ * 2. Payload length (varint).
+ * 3. Payload (bytes).
+ */
+public class ClientMessageDecoder extends ByteToMessageDecoder {
+ /** Magic bytes before handshake. */
+ public static final byte[] MAGIC_BYTES = new byte[]{0x49, 0x47, 0x4E, 0x49}; // IGNI
+
+ /** Data buffer. */
+ private byte[] data = new byte[4]; // TODO: Pooled buffers IGNITE-15162.
+
+ /** Remaining byte count. */
+ private int cnt = -4;
+
+ /** Message size. */
+ private int msgSize = -1;
+
+ /** Magic decoded flag. */
+ private boolean magicDecoded;
+
+ /** Magic decoding failed flag. */
+ private boolean magicFailed;
+
+ /** {@inheritDoc} */
+ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
+ if (!readMagic(byteBuf))
+ return;
+
+ while (read(byteBuf))
+ list.add(ByteBuffer.wrap(data));
+ }
+
+ /**
+ * Checks the magic header for the first message.
+ *
+ * @param byteBuf Buffer.
+ * @return {@code true} when magic header has been received and is valid, {@code false} otherwise.
+ * @throws IgniteException When magic is invalid.
+ */
+ private boolean readMagic(ByteBuf byteBuf) {
+ if (magicFailed)
+ return false;
+
+ if (magicDecoded)
+ return true;
+
+ if (byteBuf.readableBytes() < MAGIC_BYTES.length)
+ return false;
+
+ assert data.length == MAGIC_BYTES.length;
+
+ byteBuf.readBytes(data, 0, MAGIC_BYTES.length);
+
+ magicDecoded = true;
+ cnt = -1;
+ msgSize = 0;
+
+ if (Arrays.equals(data, MAGIC_BYTES))
+ return true;
+
+ magicFailed = true;
+
+ throw new IgniteException("Invalid magic header in thin client connection. " +
+ "Expected 'IGNI', but was '" + new String(data, CharsetUtil.US_ASCII) + "'.");
+ }
+
+ /**
+ * Reads the buffer.
+ *
+ * @param buf Buffer.
+ * @return True when a complete message has been received; false otherwise.
+ * @throws IgniteException when message is invalid.
+ */
+ private boolean read(ByteBuf buf) {
+ if (buf.readableBytes() == 0)
+ return false;
+
+ if (cnt < 0) {
+ if (buf.readableBytes() < 4)
+ return false;
+
+ msgSize = buf.readInt();
+
+ assert msgSize >= 0;
+ data = new byte[msgSize];
+ cnt = 0;
+ }
+
+ assert data != null;
+ assert msgSize >= 0;
+
+ int remaining = buf.readableBytes();
+
+ if (remaining > 0) {
+ int missing = msgSize - cnt;
+
+ if (missing > 0) {
+ int len = Math.min(missing, remaining);
+
+ buf.readBytes(data, cnt, len);
+
+ cnt += len;
+ }
+ }
+
+ if (cnt == msgSize) {
+ cnt = -1;
+ msgSize = -1;
+
+ return true;
+ }
+
+ return false;
+ }
+}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageEncoder.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageEncoder.java
new file mode 100644
index 0000000..a2dc56a
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageEncoder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+/**
+ * Encodes client messages:
+ * 1. MAGIC for first message.
+ * 2. Payload length (varint).
+ * 3. Payload (bytes).
+ */
+public class ClientMessageEncoder extends MessageToByteEncoder<ByteBuffer> {
+ /** Magic encoded flag. */
+ private boolean magicEncoded;
+
+ /** {@inheritDoc} */
+ @Override protected void encode(ChannelHandlerContext ctx, ByteBuffer message, ByteBuf out) {
+ if (!magicEncoded) {
+ out.writeBytes(ClientMessageDecoder.MAGIC_BYTES);
+
+ magicEncoded = true;
+ }
+
+ out.writeInt(message.remaining());
+ out.writeBytes(message);
+ }
+}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java
new file mode 100644
index 0000000..98fa6ce
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.UUID;
+
+import org.msgpack.core.MessageBufferPacker;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.buffer.ArrayBufferOutput;
+
+/**
+ * Ignite-specific MsgPack extension.
+ */
+public class ClientMessagePacker extends MessageBufferPacker {
+ /**
+ * Constructor.
+ */
+ public ClientMessagePacker() {
+ // TODO: Pooled buffers IGNITE-15162.
+ super(new ArrayBufferOutput(), MessagePack.DEFAULT_PACKER_CONFIG);
+ }
+
+ /**
+ * Writes an UUID.
+ *
+ * @param val UUID value.
+ * @return This instance.
+ * @throws IOException when underlying output throws IOException.
+ */
+ public ClientMessagePacker packUuid(UUID val) throws IOException {
+ packExtensionTypeHeader(ClientMsgPackType.UUID, 16);
+
+ var bytes = new byte[16];
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+ bb.putLong(val.getMostSignificantBits());
+ bb.putLong(val.getLeastSignificantBits());
+
+ writePayload(bytes);
+
+ return this;
+ }
+
+ /**
+ * Writes a decimal.
+ *
+ * @param val Decimal value.
+ * @return This instance.
+ * @throws IOException when underlying output throws IOException.
+ */
+ public ClientMessagePacker packDecimal(BigDecimal val) throws IOException {
+ throw new IOException("TODO: IGNITE-15163");
+ }
+
+ /**
+ * Writes a bit set.
+ *
+ * @param val Bit set value.
+ * @return This instance.
+ * @throws IOException when underlying output throws IOException.
+ */
+ public ClientMessagePacker packBitSet(BitSet val) throws IOException {
+ throw new IOException("TODO: IGNITE-15163");
+ }
+
+ /**
+ * Packs an object.
+ *
+ * @param val Object value.
+ * @return This instance.
+ * @throws IOException when underlying output throws IOException.
+ */
+ public ClientMessagePacker packObject(Object val) throws IOException {
+ if (val == null)
+ return (ClientMessagePacker) packNil();
+
+ if (val instanceof Integer)
+ return (ClientMessagePacker) packInt((int) val);
+
+ if (val instanceof Long)
+ return (ClientMessagePacker) packLong((long) val);
+
+ if (val instanceof UUID)
+ return packUuid((UUID) val);
+
+ if (val instanceof String)
+ return (ClientMessagePacker) packString((String) val);
+
+ if (val instanceof byte[]) {
+ byte[] bytes = (byte[]) val;
+ packBinaryHeader(bytes.length);
+ writePayload(bytes);
+
+ return this;
+ }
+
+ if (val instanceof BigDecimal)
+ return packDecimal((BigDecimal) val);
+
+ if (val instanceof BitSet)
+ return packBitSet((BitSet) val);
+
+ // TODO: Support all basic types IGNITE-15163
+ throw new IOException("Unsupported type, can't serialize: " + val.getClass());
+ }
+}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java
new file mode 100644
index 0000000..e8230e8
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.UUID;
+
+import org.apache.ignite.lang.IgniteException;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageSizeException;
+import org.msgpack.core.MessageTypeException;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.MessageBufferInput;
+
+import static org.apache.ignite.client.proto.ClientDataType.BITMASK;
+import static org.apache.ignite.client.proto.ClientDataType.BYTES;
+import static org.apache.ignite.client.proto.ClientDataType.DECIMAL;
+import static org.apache.ignite.client.proto.ClientDataType.DOUBLE;
+import static org.apache.ignite.client.proto.ClientDataType.FLOAT;
+import static org.apache.ignite.client.proto.ClientDataType.INT16;
+import static org.apache.ignite.client.proto.ClientDataType.INT32;
+import static org.apache.ignite.client.proto.ClientDataType.INT64;
+import static org.apache.ignite.client.proto.ClientDataType.INT8;
+import static org.apache.ignite.client.proto.ClientDataType.STRING;
+
+/**
+ * Ignite-specific MsgPack extension.
+ */
+public class ClientMessageUnpacker extends MessageUnpacker {
+ /**
+ * Constructor.
+ *
+ * @param in Input.
+ */
+ public ClientMessageUnpacker(MessageBufferInput in) {
+ super(in, MessagePack.DEFAULT_UNPACKER_CONFIG);
+ }
+
+ /**
+ * Reads an UUID.
+ *
+ * @return UUID value.
+ * @throws IOException when underlying input throws IOException.
+ * @throws MessageTypeException when type is not UUID.
+ * @throws MessageSizeException when size is not correct.
+ */
+ public UUID unpackUuid() throws IOException {
+ var hdr = unpackExtensionTypeHeader();
+ var type = hdr.getType();
+ var len = hdr.getLength();
+
+ if (type != ClientMsgPackType.UUID)
+ throw new MessageTypeException("Expected UUID extension (1), but got " + type);
+
+ if (len != 16)
+ throw new MessageSizeException("Expected 16 bytes for UUID extension, but got " + len, len);
+
+ var bytes = readPayload(16);
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+ return new UUID(bb.getLong(), bb.getLong());
+ }
+
+ /**
+ * Reads a decimal.
+ *
+ * @return Decimal value.
+ * @throws IOException when underlying input throws IOException.
+ */
+ public BigDecimal unpackDecimal() throws IOException {
+ throw new IOException("TODO: IGNITE-15163");
+ }
+
+ /**
+ * Reads a bit set.
+ *
+ * @return Bit set.
+ * @throws IOException when underlying input throws IOException.
+ */
+ public BitSet unpackBitSet() throws IOException {
+ throw new IOException("TODO: IGNITE-15163");
+ }
+
+ /**
+ * Unpacks an object based on the specified type.
+ *
+ * @param dataType Data type code.
+ *
+ * @return Unpacked object.
+ * @throws IOException when underlying input throws IOException.
+ * @throws IgniteException when data type is not valid.
+ */
+ public Object unpackObject(int dataType) throws IOException {
+ if (tryUnpackNil())
+ return null;
+
+ switch (dataType) {
+ case INT8:
+ return unpackByte();
+
+ case INT16:
+ return unpackShort();
+
+ case INT32:
+ return unpackInt();
+
+ case INT64:
+ return unpackLong();
+
+ case FLOAT:
+ return unpackFloat();
+
+ case DOUBLE:
+ return unpackDouble();
+
+ case ClientDataType.UUID:
+ return unpackUuid();
+
+ case STRING:
+ return unpackString();
+
+ case BYTES:
+ var cnt = unpackBinaryHeader();
+
+ return readPayload(cnt);
+
+ case DECIMAL:
+ return unpackDecimal();
+
+ case BITMASK:
+ return unpackBitSet();
+ }
+
+ throw new IgniteException("Unknown client data type: " + dataType);
+ }
+}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMsgPackType.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMsgPackType.java
new file mode 100644
index 0000000..dd92c46
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMsgPackType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+/**
+ * Ignite-specific extension type codes.
+ */
+public class ClientMsgPackType {
+ /** Number. */
+ public static final byte NUMBER = 1;
+
+ /** Decimal. */
+ public static final byte DECIMAL = 2;
+
+ /** UUID. */
+ public static final byte UUID = 3;
+
+ /** Date. */
+ public static final byte DATE = 4;
+
+ /** Time. */
+ public static final byte TIME = 5;
+
+ /** DateTime. */
+ public static final byte DATETIME = 6;
+}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientOp.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientOp.java
new file mode 100644
index 0000000..d7df3cd
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientOp.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+/**
+ * Client operation codes.
+ */
+public class ClientOp {
+ /** Create table. */
+ public static final int TABLE_CREATE = 1;
+
+ /** Drop table. */
+ public static final int TABLE_DROP = 2;
+
+ /** Get tables. */
+ public static final int TABLES_GET = 3;
+
+ /** Get table. */
+ public static final int TABLE_GET = 4;
+
+ /** Get schemas. */
+ public static final int SCHEMAS_GET = 5;
+
+ /** Upsert tuple. */
+ public static final int TUPLE_UPSERT = 10;
+
+ /** Upsert tuple without schema. */
+ public static final int TUPLE_UPSERT_SCHEMALESS = 11;
+
+ /** Get tuple. */
+ public static final int TUPLE_GET = 12;
+}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ProtocolVersion.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ProtocolVersion.java
new file mode 100644
index 0000000..fda1a8b
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ProtocolVersion.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+import java.io.IOException;
+
+import org.apache.ignite.internal.tostring.S;
+
+/** Thin client protocol version. */
+public final class ProtocolVersion implements Comparable<ProtocolVersion> {
+ /** Protocol version: 3.0.0. */
+ public static final ProtocolVersion V3_0_0 = new ProtocolVersion((short)3, (short)0, (short)0);
+
+ /** The most actual version. */
+ public static final ProtocolVersion LATEST_VER = V3_0_0;
+
+ /** Major. */
+ private final short major;
+
+ /** Minor. */
+ private final short minor;
+
+ /** Patch. */
+ private final short patch;
+
+ /**
+ * Constructor.
+ *
+ * @param major Major part.
+ * @param minor Minor part.
+ * @param patch Patch part.
+ */
+ public ProtocolVersion(short major, short minor, short patch) {
+ this.major = major;
+ this.minor = minor;
+ this.patch = patch;
+ }
+
+ /**
+ * Reads version from unpacker.
+ *
+ * @param unpacker Unpacker.
+ * @return Version.
+ * @throws IOException when underlying input throws IOException.
+ */
+ public static ProtocolVersion unpack(ClientMessageUnpacker unpacker) throws IOException {
+ return new ProtocolVersion(unpacker.unpackShort(), unpacker.unpackShort(), unpacker.unpackShort());
+ }
+
+ /**
+ * Writes this instance to the specified packer.
+ *
+ * @param packer Packer.
+ * @throws IOException when underlying output throws IOException.
+ */
+ public void pack(ClientMessagePacker packer) throws IOException {
+ packer.packShort(major).packShort(minor).packShort(patch);
+ }
+
+ /**
+ * Gets the major part.
+ *
+ * @return Major.
+ */
+ public short major() {
+ return major;
+ }
+
+ /**
+ * Gets the minor part.
+ *
+ * @return Minor.
+ */
+ public short minor() {
+ return minor;
+ }
+
+ /**
+ * Gets the patch part.
+ *
+ * @return Patch.
+ */
+ public short patch() {
+ return patch;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (!(obj instanceof ProtocolVersion))
+ return false;
+
+ ProtocolVersion other = (ProtocolVersion)obj;
+
+ return major == other.major &&
+ minor == other.minor &&
+ patch == other.patch;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = 31 * major;
+ res += ((minor & 0xFFFF) << 16) & (patch & 0xFFFF);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(ProtocolVersion other) {
+ int diff = major - other.major;
+
+ if (diff != 0)
+ return diff;
+
+ diff = minor - other.minor;
+
+ if (diff != 0)
+ return diff;
+
+ return patch - other.patch;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ProtocolVersion.class, this);
+ }
+}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ServerMessageType.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ServerMessageType.java
new file mode 100644
index 0000000..b368a93
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ServerMessageType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+/**
+ * Server to client message types.
+ */
+public class ServerMessageType {
+ /** Response to a request (initiated by the client). */
+ public static final int RESPONSE = 0;
+
+ /** Notification (initiated by the server). */
+ public static final int NOTIFICATION = 1;
+}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/package-info.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/package-info.java
new file mode 100644
index 0000000..136a3aa
--- /dev/null
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Ignite thin client shared logic (client and server: serialization, op codes, etc).
+ */
+package org.apache.ignite.client.proto;
diff --git a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageDecoderTest.java b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageDecoderTest.java
new file mode 100644
index 0000000..9276102
--- /dev/null
+++ b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageDecoderTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+import io.netty.buffer.Unpooled;
+import org.apache.ignite.client.proto.ClientMessageDecoder;
+import org.apache.ignite.lang.IgniteException;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Message decoding tests.
+ */
+public class ClientMessageDecoderTest {
+ @Test
+ void testEmptyBufferReturnsNoResults() throws Exception {
+ var buf = new byte[0];
+ var res = new ArrayList<>();
+
+ new ClientMessageDecoder().decode(null, Unpooled.wrappedBuffer(buf), res);
+
+ assertEquals(0, res.size());
+ }
+
+ @Test
+ void testValidMagicAndMessageReturnsPayload() {
+ var res = new ArrayList<>();
+ new ClientMessageDecoder().decode(null, Unpooled.wrappedBuffer(getMagicWithPayload()), res);
+
+ assertEquals(1, res.size());
+
+ var resBuf = (ByteBuffer)res.get(0);
+ assertArrayEquals(new byte[]{33, 44}, resBuf.array());
+ }
+
+ @Test
+ void testInvalidMagicThrowsException() {
+ byte[] buf = {66, 69, 69, 70, 1, 2, 3};
+
+ var t = assertThrows(IgniteException.class,
+ () -> new ClientMessageDecoder().decode(null, Unpooled.wrappedBuffer(buf), new ArrayList<>()));
+
+ assertEquals("Invalid magic header in thin client connection. Expected 'IGNI', but was 'BEEF'.",
+ t.getMessage());
+ }
+
+ /**
+ * Tests multipart buffer arrival: socket can split incoming stream into arbitrary chunks.
+ */
+ @Test
+ void testMultipartValidMagicAndMessageReturnsPayload() throws Exception {
+ var decoder = new ClientMessageDecoder();
+ var res = new ArrayList<>();
+
+ byte[] data = getMagicWithPayload();
+
+ decoder.decode(null, Unpooled.wrappedBuffer(data, 0, 4), res);
+ assertEquals(0, res.size());
+
+ decoder.decode(null, Unpooled.wrappedBuffer(data, 4, 4), res);
+ assertEquals(0, res.size());
+
+ decoder.decode(null, Unpooled.wrappedBuffer(data, 8, 1), res);
+ assertEquals(0, res.size());
+
+ decoder.decode(null, Unpooled.wrappedBuffer(data, 9, 1), res);
+ assertEquals(1, res.size());
+
+ var resBuf = (ByteBuffer) res.get(0);
+ assertArrayEquals(new byte[]{33, 44}, resBuf.array());
+ }
+
+ private byte[] getMagicWithPayload() {
+ var buf = new byte[10];
+
+ // Magic.
+ System.arraycopy(ClientMessageDecoder.MAGIC_BYTES, 0, buf, 0, 4);
+
+ // Message size.
+ buf[7] = 2;
+
+ // Payload.
+ buf[8] = 33;
+ buf[9] = 44;
+
+ return buf;
+ }
+}
diff --git a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageEncoderTest.java b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageEncoderTest.java
new file mode 100644
index 0000000..f07bb1f
--- /dev/null
+++ b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageEncoderTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+import io.netty.buffer.Unpooled;
+import org.apache.ignite.client.proto.ClientMessageEncoder;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+/**
+ * Message encoding tests.
+ */
+public class ClientMessageEncoderTest {
+ @Test
+ public void testEncodeIncludesMagicWithFirstMessage() {
+ var encoder = new ClientMessageEncoder();
+
+ byte[] res = encode(new byte[]{1, 2}, encoder);
+ assertArrayEquals(new byte[]{0x49, 0x47, 0x4E, 0x49, 0, 0, 0, 2, 1, 2}, res);
+
+ byte[] res2 = encode(new byte[]{7, 8, 9}, encoder);
+ assertArrayEquals(new byte[]{0, 0, 0, 3, 7, 8, 9}, res2);
+
+ byte[] res3 = encode(new byte[0], encoder);
+ assertArrayEquals(new byte[]{0, 0, 0, 0}, res3);
+ }
+
+ private byte[] encode(byte[] array, ClientMessageEncoder encoder) {
+ var target = Unpooled.buffer(100);
+ encoder.encode(null, ByteBuffer.wrap(array), target);
+
+ return Arrays.copyOf(target.array(), target.writerIndex());
+ }
+}
diff --git a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
new file mode 100644
index 0000000..fefb14d
--- /dev/null
+++ b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.proto;
+
+import org.apache.ignite.client.proto.ClientMessagePacker;
+import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.junit.jupiter.api.Test;
+import org.msgpack.core.buffer.ArrayBufferInput;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests Ignite-specific MsgPack extensions.
+ */
+public class ClientMessagePackerUnpackerTest {
+ @Test
+ public void testUUID() throws IOException {
+ testUUID(UUID.randomUUID());
+ testUUID(new UUID(0, 0));
+ }
+
+ private void testUUID(UUID u) throws IOException {
+ var packer = new ClientMessagePacker();
+ packer.packUuid(u);
+ byte[] data = packer.toByteArray();
+
+ var unpacker = new ClientMessageUnpacker(new ArrayBufferInput(data));
+ var res = unpacker.unpackUuid();
+
+ assertEquals(u, res);
+ }
+}
diff --git a/modules/client-handler/README.md b/modules/client-handler/README.md
new file mode 100644
index 0000000..73c9160
--- /dev/null
+++ b/modules/client-handler/README.md
@@ -0,0 +1,4 @@
+# Ignite Client Handler: server side of thin client
+
+This module contains server-side client protocol handler.
+
diff --git a/modules/client-handler/pom.xml b/modules/client-handler/pom.xml
new file mode 100644
index 0000000..4c7eaff
--- /dev/null
+++ b/modules/client-handler/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-client-handler</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-client-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration-annotation-processor</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-table</artifactId>
+ </dependency>
+
+ <!-- 3rd party dependencies -->
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>jackson-dataformat-msgpack</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientContext.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientContext.java
new file mode 100644
index 0000000..a795dfe
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientContext.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler;
+
+import java.util.BitSet;
+
+import org.apache.ignite.client.proto.ProtocolVersion;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Client connection context.
+ */
+class ClientContext {
+ /** Version. */
+ private final ProtocolVersion version;
+
+ /** Client type code. */
+ private final int clientCode;
+
+ /** Feature set. */
+ private final BitSet features;
+
+ /**
+ * Constructor.
+ *
+ * @param version Version.
+ * @param clientCode Client type code.
+ * @param features Feature set.
+ */
+ ClientContext(ProtocolVersion version, int clientCode, BitSet features) {
+ this.version = version;
+ this.clientCode = clientCode;
+ this.features = features;
+ }
+
+ /**
+ * Gets the protocol version.
+ *
+ * @return Protocol version.
+ */
+ public ProtocolVersion version() {
+ return version;
+ }
+
+ /**
+ * Gets the client code.
+ *
+ * @return Client code.
+ */
+ public int clientCode() {
+ return clientCode;
+ }
+
+ /**
+ * Gets the features.
+ *
+ * @return Features.
+ */
+ public BitSet features() {
+ return features;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ClientContext.class, this);
+ }
+}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
new file mode 100644
index 0000000..18791f8
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler;
+
+import java.net.BindException;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.client.proto.ClientMessageDecoder;
+import org.apache.ignite.client.proto.ClientMessageEncoder;
+import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
+import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.slf4j.Logger;
+
+/**
+ * Client handler module maintains TCP endpoint for thin client connections.
+ *
+ */
+public class ClientHandlerModule {
+ /** Configuration registry. */
+ private ConfigurationRegistry registry;
+
+ /** Ignite API entry poiny. */
+ private final Ignite ignite;
+
+ /** Logger. */
+ private final Logger log;
+
+ /**
+ * Constructor.
+ *
+ * @param ignite Ignite.
+ * @param log Logger.
+ */
+ public ClientHandlerModule(Ignite ignite, Logger log) {
+ this.ignite = ignite;
+ this.log = log;
+ }
+
+ /**
+ * Prepares to start the module.
+ *
+ * @param sysCfg Configuration registry.
+ */
+ public void prepareStart(ConfigurationRegistry sysCfg) {
+ registry = sysCfg;
+ }
+
+ /**
+ * Starts the module.
+ *
+ * @return channel future.
+ * @throws InterruptedException If thread has been interrupted during the start.
+ */
+ public ChannelFuture start() throws InterruptedException {
+ return startEndpoint();
+ }
+
+ /**
+ * Starts the endpoint.
+ *
+ * @return Channel future.
+ * @throws InterruptedException If thread has been interrupted during the start.
+ * @throws IgniteException When startup has failed.
+ */
+ private ChannelFuture startEndpoint() throws InterruptedException {
+ var configuration = registry.getConfiguration(ClientConnectorConfiguration.KEY);
+
+ // TODO: Handle defaults IGNITE-15164.
+ int desiredPort = configuration.port().value() == null ? 10800 : configuration.port().value();
+ int portRange = configuration.portRange().value() == null ? 100 : configuration.portRange().value();
+
+ int port = 0;
+
+ Channel ch = null;
+
+ EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+
+ ServerBootstrap b = new ServerBootstrap();
+
+ b.group(eventLoopGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<>() {
+ @Override
+ protected void initChannel(Channel ch) {
+ ch.pipeline().addLast(
+ new ClientMessageDecoder(),
+ new ClientMessageEncoder(),
+ new ClientInboundMessageHandler(ignite, log));
+ }
+ })
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.TCP_NODELAY, true);
+
+ for (int portCandidate = desiredPort; portCandidate < desiredPort + portRange; portCandidate++) {
+ ChannelFuture bindRes = b.bind(portCandidate).await();
+
+ if (bindRes.isSuccess()) {
+ ch = bindRes.channel();
+ ch.closeFuture().addListener((ChannelFutureListener) fut -> eventLoopGroup.shutdownGracefully());
+
+ port = portCandidate;
+ break;
+ }
+ else if (!(bindRes.cause() instanceof BindException)) {
+ eventLoopGroup.shutdownGracefully();
+ throw new IgniteException(bindRes.cause());
+ }
+ }
+
+ if (ch == null) {
+ String msg = "Cannot start thin client connector endpoint. " +
+ "All ports in range [" + desiredPort + ", " + (desiredPort + portRange) + "] are in use.";
+
+ log.error(msg);
+
+ eventLoopGroup.shutdownGracefully();
+
+ throw new IgniteException(msg);
+ }
+
+ log.info("Thin client connector started successfully on port " + port);
+
+ return ch.closeFuture();
+ }
+}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
new file mode 100644
index 0000000..be18616
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.client.proto.ClientDataType;
+import org.apache.ignite.client.proto.ClientErrorCode;
+import org.apache.ignite.client.proto.ClientMessagePacker;
+import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.client.proto.ClientOp;
+import org.apache.ignite.client.proto.ProtocolVersion;
+import org.apache.ignite.client.proto.ServerMessageType;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaAware;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TupleBuilderImpl;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.msgpack.core.MessageFormat;
+import org.msgpack.core.buffer.ByteBufferInput;
+import org.slf4j.Logger;
+
+/**
+ * Handles messages from thin clients.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
+ /** Logger. */
+ private final Logger log;
+
+ /** API entry point. */
+ private final Ignite ignite;
+
+ /** Context. */
+ private ClientContext clientContext;
+
+ /**
+ * Constructor.
+ *
+ * @param ignite Ignite API entry point.
+ * @param log Logger.
+ */
+ public ClientInboundMessageHandler(Ignite ignite, Logger log) {
+ assert ignite != null;
+ assert log != null;
+
+ this.ignite = ignite;
+ this.log = log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
+ var buf = (ByteBuffer) msg;
+
+ var unpacker = getUnpacker(buf);
+ var packer = getPacker();
+
+ if (clientContext == null)
+ handshake(ctx, unpacker, packer);
+ else
+ processOperation(ctx, unpacker, packer);
+ }
+
+ private void handshake(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer)
+ throws IOException {
+ try {
+ var clientVer = ProtocolVersion.unpack(unpacker);
+
+ if (!clientVer.equals(ProtocolVersion.LATEST_VER))
+ throw new IgniteException("Unsupported version: " +
+ clientVer.major() + "." + clientVer.minor() + "." + clientVer.patch());
+
+ var clientCode = unpacker.unpackInt();
+ var featuresLen = unpacker.unpackBinaryHeader();
+ var features = BitSet.valueOf(unpacker.readPayload(featuresLen));
+
+ clientContext = new ClientContext(clientVer, clientCode, features);
+
+ log.debug("Handshake: " + clientContext);
+
+ var extensionsLen = unpacker.unpackMapHeader();
+ unpacker.skipValue(extensionsLen);
+
+ // Response.
+ ProtocolVersion.LATEST_VER.pack(packer);
+
+ packer.packInt(ClientErrorCode.SUCCESS)
+ .packBinaryHeader(0) // Features.
+ .packMapHeader(0); // Extensions.
+
+ write(packer, ctx);
+ }
+ catch (Throwable t) {
+ packer = getPacker();
+
+ ProtocolVersion.LATEST_VER.pack(packer);
+ packer.packInt(ClientErrorCode.FAILED).packString(t.getMessage());
+
+ write(packer, ctx);
+ }
+ }
+
+ private void write(ClientMessagePacker packer, ChannelHandlerContext ctx) {
+ var buf = packer.toMessageBuffer().sliceAsByteBuffer();
+
+ ctx.writeAndFlush(buf);
+ }
+
+ private void writeError(int requestId, Throwable err, ChannelHandlerContext ctx) {
+ try {
+ assert err != null;
+
+ ClientMessagePacker packer = getPacker();
+ packer.packInt(ServerMessageType.RESPONSE);
+ packer.packInt(requestId);
+ packer.packInt(ClientErrorCode.FAILED);
+
+ String msg = err.getMessage();
+
+ if (msg == null)
+ msg = err.getClass().getName();
+
+ packer.packString(msg);
+
+ write(packer, ctx);
+ }
+ catch (Throwable t) {
+ exceptionCaught(ctx, t);
+ }
+ }
+
+ private ClientMessagePacker getPacker() {
+ return new ClientMessagePacker();
+ }
+
+ private ClientMessageUnpacker getUnpacker(ByteBuffer buf) {
+ return new ClientMessageUnpacker(new ByteBufferInput(buf));
+ }
+
+ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer) throws IOException {
+ var opCode = unpacker.unpackInt();
+ var requestId = unpacker.unpackInt();
+
+ packer.packInt(ServerMessageType.RESPONSE)
+ .packInt(requestId)
+ .packInt(ClientErrorCode.SUCCESS);
+
+ try {
+ var fut = processOperation(unpacker, packer, opCode);
+
+ if (fut == null) {
+ // Operation completed synchronously.
+ write(packer, ctx);
+ }
+ else {
+ fut.whenComplete((Object res, Object err) -> {
+ if (err != null)
+ writeError(requestId, (Throwable) err, ctx);
+ else
+ write(packer, ctx);
+ });
+ }
+
+ }
+ catch (Throwable t) {
+ writeError(requestId, t, ctx);
+ }
+ }
+
+ private CompletableFuture processOperation(ClientMessageUnpacker unpacker, ClientMessagePacker packer, int opCode)
+ throws IOException {
+ // TODO: Handle all operations asynchronously (add async table API).
+ switch (opCode) {
+ case ClientOp.TABLE_DROP: {
+ var tableName = unpacker.unpackString();
+
+ ignite.tables().dropTable(tableName);
+
+ break;
+ }
+
+ case ClientOp.TABLES_GET: {
+ List<Table> tables = ignite.tables().tables();
+
+ packer.packMapHeader(tables.size());
+
+ for (var table : tables) {
+ var tableImpl = (TableImpl) table;
+
+ packer.packUuid(tableImpl.tableId());
+ packer.packString(table.tableName());
+ }
+
+ break;
+ }
+
+ case ClientOp.SCHEMAS_GET: {
+ var table = readTable(unpacker);
+
+ if (unpacker.getNextFormat() == MessageFormat.NIL) {
+ // Return the latest schema.
+ packer.packMapHeader(1);
+
+ var schema = table.schemaView().schema();
+
+ if (schema == null)
+ throw new IgniteException("Schema registry is not initialized.");
+
+ writeSchema(packer, schema.version(), schema);
+ }
+ else {
+ var cnt = unpacker.unpackArrayHeader();
+ packer.packMapHeader(cnt);
+
+ for (var i = 0; i < cnt; i++) {
+ var schemaVer = unpacker.unpackInt();
+ var schema = table.schemaView().schema(schemaVer);
+ writeSchema(packer, schemaVer, schema);
+ }
+ }
+
+ break;
+ }
+
+ case ClientOp.TABLE_GET: {
+ String tableName = unpacker.unpackString();
+ Table table = ignite.tables().table(tableName);
+
+ if (table == null)
+ packer.packNil();
+ else
+ packer.packUuid(((TableImpl) table).tableId());
+
+ break;
+ }
+
+ case ClientOp.TUPLE_UPSERT: {
+ var table = readTable(unpacker);
+ var tuple = readTuple(unpacker, table, false);
+
+ return table.upsertAsync(tuple);
+ }
+
+ case ClientOp.TUPLE_UPSERT_SCHEMALESS: {
+ var table = readTable(unpacker);
+ var tuple = readTupleSchemaless(unpacker, table);
+
+ return table.upsertAsync(tuple);
+ }
+
+ case ClientOp.TUPLE_GET: {
+ var table = readTable(unpacker);
+ var keyTuple = readTuple(unpacker, table, true);
+
+ return table.getAsync(keyTuple).thenAccept(t -> writeTuple(packer, t));
+ }
+
+ default:
+ throw new IgniteException("Unexpected operation code: " + opCode);
+ }
+
+ return null;
+ }
+
+ private void writeSchema(ClientMessagePacker packer, int schemaVer, SchemaDescriptor schema) throws IOException {
+ packer.packInt(schemaVer);
+
+ if (schema == null) {
+ packer.packNil();
+
+ return;
+ }
+
+ var colCnt = schema.columnNames().size();
+ packer.packArrayHeader(colCnt);
+
+ for (var colIdx = 0; colIdx < colCnt; colIdx++) {
+ var col = schema.column(colIdx);
+
+ packer.packArrayHeader(4);
+ packer.packString(col.name());
+ packer.packInt(getClientDataType(col.type().spec()));
+ packer.packBoolean(schema.isKeyColumn(colIdx));
+ packer.packBoolean(col.nullable());
+ }
+ }
+
+ private void writeTuple(ClientMessagePacker packer, Tuple tuple) {
+ try {
+ if (tuple == null) {
+ packer.packNil();
+
+ return;
+ }
+
+ var schema = ((SchemaAware) tuple).schema();
+
+ packer.packInt(schema.version());
+
+ for (var col : schema.keyColumns().columns())
+ writeColumnValue(packer, tuple, col);
+
+ for (var col : schema.valueColumns().columns())
+ writeColumnValue(packer, tuple, col);
+ }
+ catch (Throwable t) {
+ throw new IgniteException("Failed to serialize tuple", t);
+ }
+ }
+
+ private Tuple readTuple(ClientMessageUnpacker unpacker, TableImpl table, boolean keyOnly) throws IOException {
+ var schemaId = unpacker.unpackInt();
+ var schema = table.schemaView().schema(schemaId);
+ var builder = (TupleBuilderImpl) table.tupleBuilder();
+
+ var cnt = keyOnly ? schema.keyColumns().length() : schema.length();
+
+ for (int i = 0; i < cnt; i++) {
+ if (unpacker.getNextFormat() == MessageFormat.NIL) {
+ unpacker.skipValue();
+ continue;
+ }
+
+ readAndSetColumnValue(unpacker, builder, schema.column(i));
+ }
+
+ return builder.build();
+ }
+
+ private Tuple readTupleSchemaless(ClientMessageUnpacker unpacker, TableImpl table) throws IOException {
+ var cnt = unpacker.unpackMapHeader();
+ var builder = table.tupleBuilder();
+
+ for (int i = 0; i < cnt; i++) {
+ var colName = unpacker.unpackString();
+
+ builder.set(colName, unpacker.unpackValue());
+ }
+
+ return builder.build();
+ }
+
+ private TableImpl readTable(ClientMessageUnpacker unpacker) throws IOException {
+ var tableId = unpacker.unpackUuid();
+
+ return ((IgniteTablesInternal)ignite.tables()).table(tableId);
+ }
+
+ private void readAndSetColumnValue(ClientMessageUnpacker unpacker, TupleBuilderImpl builder, Column col)
+ throws IOException {
+ builder.set(col.name(), unpacker.unpackObject(getClientDataType(col.type().spec())));
+ }
+
+ private static int getClientDataType(NativeTypeSpec spec) {
+ switch (spec) {
+ case INT8:
+ return ClientDataType.INT8;
+
+ case INT16:
+ return ClientDataType.INT16;
+
+ case INT32:
+ return ClientDataType.INT32;
+
+ case INT64:
+ return ClientDataType.INT64;
+
+ case FLOAT:
+ return ClientDataType.FLOAT;
+
+ case DOUBLE:
+ return ClientDataType.DOUBLE;
+
+ case DECIMAL:
+ return ClientDataType.DECIMAL;
+
+ case UUID:
+ return ClientDataType.UUID;
+
+ case STRING:
+ return ClientDataType.STRING;
+
+ case BYTES:
+ return ClientDataType.BYTES;
+
+ case BITMASK:
+ return ClientDataType.BITMASK;
+ }
+
+ throw new IgniteException("Unsupported native type: " + spec);
+ }
+
+ private void writeColumnValue(ClientMessagePacker packer, Tuple tuple, Column col) throws IOException {
+ var val = tuple.value(col.name());
+
+ if (val == null) {
+ packer.packNil();
+ return;
+ }
+
+ switch (col.type().spec()) {
+ case INT8:
+ packer.packByte((byte) val);
+ break;
+
+ case INT16:
+ packer.packShort((short) val);
+ break;
+
+ case INT32:
+ packer.packInt((int) val);
+ break;
+
+ case INT64:
+ packer.packLong((long) val);
+ break;
+
+ case FLOAT:
+ packer.packFloat((float) val);
+ break;
+
+ case DOUBLE:
+ packer.packDouble((double) val);
+ break;
+
+ case DECIMAL:
+ packer.packDecimal((BigDecimal) val);
+ break;
+
+ case UUID:
+ packer.packUuid((UUID) val);
+ break;
+
+ case STRING:
+ packer.packString((String) val);
+ break;
+
+ case BYTES:
+ byte[] bytes = (byte[]) val;
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ break;
+
+ case BITMASK:
+ packer.packBitSet((BitSet)val);
+ break;
+
+ default:
+ throw new IgniteException("Data type not supported: " + col.type());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void channelReadComplete(ChannelHandlerContext ctx) {
+ ctx.flush();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ log.error(cause.getMessage(), cause);
+
+ ctx.close();
+ }
+}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/package-info.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/package-info.java
new file mode 100644
index 0000000..0da863f
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Ignite thin client handler (server-side connector).
+ */
+package org.apache.ignite.client.handler;
diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java
new file mode 100644
index 0000000..fa73707
--- /dev/null
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Collections;
+
+import io.netty.channel.ChannelFuture;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
+import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.msgpack.core.MessagePack;
+import org.slf4j.helpers.NOPLogger;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Client connector integration tests with real sockets.
+ */
+public class ClientHandlerIntegrationTest {
+ /** Magic bytes. */
+ private static final byte[] MAGIC = new byte[]{0x49, 0x47, 0x4E, 0x49};
+
+ private ChannelFuture serverFuture;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ serverFuture = startServer();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ serverFuture.cancel(true);
+ serverFuture.await();
+ }
+
+ @Test
+ void testHandshakeInvalidMagicHeaderDropsConnection() throws Exception {
+ try (var sock = new Socket("127.0.0.1", 10800)) {
+ OutputStream out = sock.getOutputStream();
+ out.write(new byte[]{63, 64, 65, 66, 67});
+ out.flush();
+
+ assertThrows(IOException.class, () -> writeAndFlushLoop(sock));
+ }
+ }
+
+ @Test
+ void testHandshakeValidReturnsSuccess() throws Exception {
+ try (var sock = new Socket("127.0.0.1", 10800)) {
+ OutputStream out = sock.getOutputStream();
+
+ // Magic: IGNI
+ out.write(MAGIC);
+
+ // Handshake.
+ var packer = MessagePack.newDefaultBufferPacker();
+ packer.packInt(0);
+ packer.packInt(0);
+ packer.packInt(0);
+ packer.packInt(7); // Size.
+
+ packer.packInt(3); // Major.
+ packer.packInt(0); // Minor.
+ packer.packInt(0); // Patch.
+
+ packer.packInt(2); // Client type: general purpose.
+
+ packer.packBinaryHeader(0); // Features.
+ packer.packMapHeader(0); // Extensions.
+
+ out.write(packer.toByteArray());
+ out.flush();
+
+ // Read response.
+ var unpacker = MessagePack.newDefaultUnpacker(sock.getInputStream());
+ var magic = unpacker.readPayload(4);
+ unpacker.skipValue(3); // LE int zeros.
+ var len = unpacker.unpackInt();
+ var major = unpacker.unpackInt();
+ var minor = unpacker.unpackInt();
+ var patch = unpacker.unpackInt();
+ var errorCode = unpacker.unpackInt();
+
+ var featuresLen = unpacker.unpackBinaryHeader();
+ unpacker.skipValue(featuresLen);
+
+ var extensionsLen = unpacker.unpackMapHeader();
+ unpacker.skipValue(extensionsLen);
+
+ assertArrayEquals(MAGIC, magic);
+ assertEquals(7, len);
+ assertEquals(3, major);
+ assertEquals(0, minor);
+ assertEquals(0, patch);
+ assertEquals(0, errorCode);
+ }
+ }
+
+ @Test
+ void testHandshakeInvalidVersionReturnsError() throws Exception {
+ try (var sock = new Socket("127.0.0.1", 10800)) {
+ OutputStream out = sock.getOutputStream();
+
+ // Magic: IGNI
+ out.write(MAGIC);
+
+ // Handshake.
+ var packer = MessagePack.newDefaultBufferPacker();
+ packer.packInt(0);
+ packer.packInt(0);
+ packer.packInt(0);
+ packer.packInt(7); // Size.
+
+ packer.packInt(2); // Major.
+ packer.packInt(8); // Minor.
+ packer.packInt(0); // Patch.
+
+ packer.packInt(2); // Client type: general purpose.
+
+ packer.packBinaryHeader(0); // Features.
+ packer.packMapHeader(0); // Extensions.
+
+ out.write(packer.toByteArray());
+ out.flush();
+
+ // Read response.
+ var unpacker = MessagePack.newDefaultUnpacker(sock.getInputStream());
+ var magic = unpacker.readPayload(4);
+ unpacker.skipValue(3);
+ var len = unpacker.unpackInt();
+ var major = unpacker.unpackInt();
+ var minor = unpacker.unpackInt();
+ var patch = unpacker.unpackInt();
+ var errorCode = unpacker.unpackInt();
+ var err = unpacker.unpackString();
+
+ assertArrayEquals(MAGIC, magic);
+ assertEquals(31, len);
+ assertEquals(3, major);
+ assertEquals(0, minor);
+ assertEquals(0, patch);
+ assertEquals(1, errorCode);
+ assertEquals("Unsupported version: 2.8.0", err);
+ }
+ }
+
+ private ChannelFuture startServer() throws InterruptedException {
+ var registry = new ConfigurationRegistry(
+ Collections.singletonList(ClientConnectorConfiguration.KEY),
+ Collections.emptyMap(),
+ Collections.singletonList(new TestConfigurationStorage(ConfigurationType.LOCAL))
+ );
+
+ var module = new ClientHandlerModule(mock(Ignite.class), NOPLogger.NOP_LOGGER);
+
+ module.prepareStart(registry);
+
+ return module.start();
+ }
+
+ private void writeAndFlushLoop(Socket socket) throws Exception {
+ var stop = System.currentTimeMillis() + 5000;
+ var out = socket.getOutputStream();
+
+ while (System.currentTimeMillis() < stop) {
+ out.write(1);
+ out.flush();
+ }
+ }
+}
diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/TestConfigurationStorage.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/TestConfigurationStorage.java
new file mode 100644
index 0000000..e170b7f
--- /dev/null
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/TestConfigurationStorage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
+import org.apache.ignite.internal.configuration.storage.ConfigurationStorageListener;
+import org.apache.ignite.internal.configuration.storage.Data;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+
+/**
+ * Test configuration storage.
+ */
+public class TestConfigurationStorage implements ConfigurationStorage {
+ /** Listeners. */
+ private final Set<ConfigurationStorageListener> listeners = new HashSet<>();
+
+ /** Configuration type. */
+ private final ConfigurationType type;
+
+ /**
+ * @param type Configuration type.
+ */
+ public TestConfigurationStorage(ConfigurationType type) {
+ this.type = type;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Data readAll() throws StorageException {
+ return new Data(Collections.emptyMap(), 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
+ for (ConfigurationStorageListener listener : listeners)
+ listener.onEntriesChanged(new Data(newValues, version + 1));
+
+ return CompletableFuture.completedFuture(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void registerConfigurationListener(ConfigurationStorageListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override public void notifyApplied(long storageRevision) {
+ }
+
+ /** {@inheritDoc} */
+ @Override public ConfigurationType type() {
+ return type;
+ }
+}
diff --git a/modules/client/README.md b/modules/client/README.md
new file mode 100644
index 0000000..0e81cf8
--- /dev/null
+++ b/modules/client/README.md
@@ -0,0 +1,4 @@
+# Ignite Client module
+
+This module contains ignite Java thin client.
+
diff --git a/modules/client/pom.xml b/modules/client/pom.xml
new file mode 100644
index 0000000..05c2d0c
--- /dev/null
+++ b/modules/client/pom.xml
@@ -0,0 +1,132 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-client</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-client-common</artifactId>
+ </dependency>
+
+ <!-- 3rd party dependencies -->
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>jackson-dataformat-msgpack</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-client-handler</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
new file mode 100644
index 0000000..7c6e734
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.internal.client.TcpIgniteClient;
+
+/**
+ * Ignite client entry point.
+ */
+public class IgniteClient {
+ /**
+ * Gets a new client builder.
+ *
+ * @return New client builder.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Client builder. */
+ public static class Builder {
+ /** Addresses. */
+ private String[] addresses;
+
+ /**
+ * Builds the client.
+ *
+ * @return Ignite client.
+ */
+ public Ignite build() {
+ // TODO: Validate values IGNITE-15164.
+ return buildAsync().join();
+ }
+
+ /**
+ * Sets the addresses.
+ *
+ * @param addrs Addresses.
+ * @return This instance
+ */
+ public Builder addresses(String... addrs) {
+ addresses = addrs;
+
+ return this;
+ }
+
+ /**
+ * Builds the client.
+ *
+ * @return Ignite client.
+ */
+ public CompletableFuture<Ignite> buildAsync() {
+ // TODO: Async connect IGNITE-15164.
+ var cfg = new IgniteClientConfigurationImpl(null, addresses, 0);
+
+ return CompletableFuture.completedFuture(new TcpIgniteClient(cfg));
+ }
+ }
+
+ /**
+ * Immutable configuration.
+ */
+ private static class IgniteClientConfigurationImpl implements IgniteClientConfiguration {
+ /** Address finder. */
+ private final IgniteClientAddressFinder addressFinder;
+
+ /** Addresses. */
+ private final String[] addresses;
+
+ /** Retry limit. */
+ private final int retryLimit;
+
+ /**
+ * Constructor.
+ *
+ * @param addressFinder Address finder.
+ * @param addresses Addresses.
+ * @param retryLimit Retry limit.
+ */
+ IgniteClientConfigurationImpl(IgniteClientAddressFinder addressFinder, String[] addresses, int retryLimit) {
+ this.addressFinder = addressFinder;
+ this.addresses = addresses;
+ this.retryLimit = retryLimit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteClientAddressFinder getAddressesFinder() {
+ return addressFinder;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String[] getAddresses() {
+ // TODO: Defensive copy IGNITE-15164.
+ return addresses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getRetryLimit() {
+ return retryLimit;
+ }
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientAddressFinder.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientAddressFinder.java
new file mode 100644
index 0000000..0f9276c
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientAddressFinder.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+/**
+ * This interface provides a list of addresses of Ignite server nodes within a cluster. Thin client uses the list to
+ * route user requests. There are cases when the list is not static, for example in cloud environment. In such cases
+ * addresses of nodes and/or number of server nodes can change. Implementation of this interface should handle these.
+ */
+public interface IgniteClientAddressFinder {
+ /**
+ * Get addresses of Ignite server nodes within a cluster. An address can be IPv4 address or hostname, with or
+ * without port. If port is not set then Ignite will generate multiple addresses for default port range.
+ *
+ * @return Addresses of Ignite server nodes within a cluster.
+ */
+ public String[] getAddresses();
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientAuthenticationException.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientAuthenticationException.java
new file mode 100644
index 0000000..c1879c4
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientAuthenticationException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+/**
+ * Indicates user name or password is invalid.
+ */
+public class IgniteClientAuthenticationException extends IgniteClientException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructor.
+ *
+ * @param msg the detail message.
+ */
+ public IgniteClientAuthenticationException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ *
+ * @param msg the detail message.
+ * @param cause the cause.
+ */
+ public IgniteClientAuthenticationException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientAuthorizationException.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientAuthorizationException.java
new file mode 100644
index 0000000..572506a
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientAuthorizationException.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+/**
+ * Indicates that user is not authorized to perform an operation.
+ */
+public class IgniteClientAuthorizationException extends IgniteClientException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Message. */
+ private static final String MSG = "User is not authorized to perform this operation";
+
+ /**
+ * Default constructor.
+ */
+ public IgniteClientAuthorizationException() {
+ super(MSG);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause and a detail message.
+ *
+ * @param cause the cause.
+ */
+ public IgniteClientAuthorizationException(Throwable cause) {
+ super(MSG, cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ *
+ * @param msg the detail message.
+ * @param cause the cause.
+ */
+ public IgniteClientAuthorizationException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
new file mode 100644
index 0000000..4b648ff
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+/**
+ * Ignite client configuration.
+ * TODO: improve and finalize IGNITE-15164.
+ */
+public interface IgniteClientConfiguration {
+ /** Default port. */
+ int DFLT_PORT = 10800;
+
+ /** Default port range. */
+ int DFLT_PORT_RANGE = 100;
+
+ /** Default socket send and receive buffer size. */
+ int DFLT_SOCK_BUF_SIZE = 0;
+
+ /** Default value for {@code TCP_NODELAY} socket option. */
+ boolean DFLT_TCP_NO_DELAY = true;
+
+ /**
+ * Gets the address finder.
+ *
+ * @return Address finder.
+ */
+ IgniteClientAddressFinder getAddressesFinder();
+
+ /**
+ * Gets the addresses.
+ *
+ * @return Addresses.
+ */
+ String[] getAddresses();
+
+ /**
+ * Gets the retry limit.
+ *
+ * @return Retry limit.
+ */
+ int getRetryLimit();
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConnectionException.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConnectionException.java
new file mode 100644
index 0000000..0a16a5c
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConnectionException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+/**
+ * Indicates all the Ignite servers specified in the client configuration are no longer available.
+ */
+public class IgniteClientConnectionException extends IgniteClientException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructs a new exception with the specified detail message.
+ *
+ * @param msg the detail message.
+ */
+ public IgniteClientConnectionException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause and detail message.
+ *
+ * @param msg the detail message.
+ * @param cause the cause.
+ */
+ public IgniteClientConnectionException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientException.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientException.java
new file mode 100644
index 0000000..a559e6c
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientException.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import org.apache.ignite.client.proto.ClientErrorCode;
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Common thin client unchecked exception.
+ */
+public class IgniteClientException extends IgniteException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Error code. */
+ private final int errorCode;
+
+ /**
+ * Constructs a new exception with {@code null} as its detail message.
+ */
+ public IgniteClientException() {
+ errorCode = ClientErrorCode.FAILED;
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message.
+ *
+ * @param msg the detail message.
+ */
+ public IgniteClientException(String msg) {
+ super(msg);
+
+ this.errorCode = ClientErrorCode.FAILED;
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message.
+ *
+ * @param msg the detail message.
+ * @param errorCode the error code.
+ */
+ public IgniteClientException(String msg, int errorCode) {
+ super(msg);
+
+ this.errorCode = errorCode;
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ *
+ * @param msg the detail message.
+ * @param errorCode the error code.
+ * @param cause the cause.
+ */
+ public IgniteClientException(String msg, int errorCode, Throwable cause) {
+ super(msg, cause);
+
+ this.errorCode = errorCode;
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ *
+ * @param msg the detail message.
+ * @param cause the cause.
+ */
+ public IgniteClientException(String msg, Throwable cause) {
+ super(msg, cause);
+
+ this.errorCode = ClientErrorCode.FAILED;
+ }
+
+ /**
+ * Gets the error code. See {@link ClientErrorCode}.
+ *
+ * @return Error code.
+ */
+ public int errorCode() {
+ return errorCode;
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientFeatureNotSupportedByServerException.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientFeatureNotSupportedByServerException.java
new file mode 100644
index 0000000..4dff0de
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientFeatureNotSupportedByServerException.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import org.apache.ignite.internal.client.ProtocolBitmaskFeature;
+
+/**
+ * Indicates that thin client feature is not supported by the server.
+ */
+public class IgniteClientFeatureNotSupportedByServerException extends IgniteClientException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Constructs a new exception with the specified detail message.
+ *
+ * @param msg the detail message.
+ */
+ public IgniteClientFeatureNotSupportedByServerException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructs a new exception with the specified missing feature.
+ *
+ * @param feature Feature.
+ */
+ public IgniteClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature feature) {
+ super("Feature " + feature.name() + " is not supported by the server");
+ }
+
+ /**
+ * Constructs a new exception with the specified cause and detail message.
+ *
+ * @param msg the detail message.
+ * @param cause the cause.
+ */
+ public IgniteClientFeatureNotSupportedByServerException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/client/package-info.java b/modules/client/src/main/java/org/apache/ignite/client/package-info.java
new file mode 100644
index 0000000..de85786
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/client/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Ignite thin client.
+ */
+package org.apache.ignite.client;
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
new file mode 100644
index 0000000..6f54edf
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.ignite.client.IgniteClientAuthorizationException;
+import org.apache.ignite.client.IgniteClientConnectionException;
+import org.apache.ignite.client.IgniteClientException;
+
+/**
+ * Processing thin client requests and responses.
+ */
+interface ClientChannel extends AutoCloseable {
+ /**
+ * Send request and handle response asynchronously for client operation.
+ *
+ * @param opCode Operation code.
+ * @param payloadWriter Payload writer to stream or {@code null} if request has no payload.
+ * @param payloadReader Payload reader from stream.
+ * @param <T> Response type.
+ * @return Future for the operation.
+ * @throws IgniteClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
+ * @throws IgniteClientAuthorizationException When user has no permission to perform operation.
+ * @throws IgniteClientConnectionException In case of IO errors.
+ */
+ public <T> CompletableFuture<T> serviceAsync(
+ int opCode,
+ PayloadWriter payloadWriter,
+ PayloadReader<T> payloadReader
+ );
+
+ /**
+ * @return {@code True} channel is closed.
+ */
+ public boolean closed();
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelConfiguration.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelConfiguration.java
new file mode 100644
index 0000000..db8ba06
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelConfiguration.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Configuration required to initialize {@link TcpClientChannel}.
+ */
+final class ClientChannelConfiguration {
+ /** Host. */
+ private final InetSocketAddress addr;
+
+ /** Tcp no delay. */
+ private final boolean tcpNoDelay;
+
+ /** Timeout. */
+ private final int timeout;
+
+ /** Send buffer size. */
+ private final int sndBufSize;
+
+ /** Receive buffer size. */
+ private final int rcvBufSize;
+
+ /** Reconnect period (for throttling). */
+ private final long reconnectThrottlingPeriod;
+
+ /** Reconnect retries within period (for throttling). */
+ private final int reconnectThrottlingRetries;
+
+ /**
+ * Constructor.
+ *
+ * @param cfg Config.
+ * @param addr Address.
+ */
+ @SuppressWarnings("UnnecessaryThis")
+ ClientChannelConfiguration(Object cfg, InetSocketAddress addr) {
+ // TODO: Get from public API cfg IGNITE-15164.
+ this.tcpNoDelay = true;
+ this.timeout = 0;
+ this.sndBufSize = 0;
+ this.rcvBufSize = 0;
+ this.addr = addr;
+ this.reconnectThrottlingPeriod = 0;
+ this.reconnectThrottlingRetries = 0;
+ }
+
+ /**
+ * @return Address.
+ */
+ public InetSocketAddress getAddress() {
+ return addr;
+ }
+
+ /**
+ * @return Tcp no delay.
+ */
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ /**
+ * @return Timeout.
+ */
+ public int getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * @return Send buffer size.
+ */
+ public int getSendBufferSize() {
+ return sndBufSize;
+ }
+
+ /**
+ * @return Receive buffer size.
+ */
+ public int getReceiveBufferSize() {
+ return rcvBufSize;
+ }
+
+ /**
+ * @return Reconnect period (for throttling).
+ */
+ public long getReconnectThrottlingPeriod() {
+ return reconnectThrottlingPeriod;
+ }
+
+ /**
+ * @return Reconnect retries within period (for throttling).
+ */
+ public int getReconnectThrottlingRetries() {
+ return reconnectThrottlingRetries;
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/HostAndPortRange.java b/modules/client/src/main/java/org/apache/ignite/internal/client/HostAndPortRange.java
new file mode 100644
index 0000000..0601418
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/HostAndPortRange.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.io.Serializable;
+import java.net.Inet6Address;
+import java.net.UnknownHostException;
+
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Represents address along with port range.
+ */
+public class HostAndPortRange implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Host. */
+ private final String host;
+
+ /** Port from. */
+ private final int portFrom;
+
+ /** Port to. */
+ private final int portTo;
+
+ /**
+ * Parse string into host and port pair.
+ *
+ * @param addrStr String.
+ * @param dfltPortFrom Default port from.
+ * @param dfltPortTo Default port to.
+ * @param errMsgPrefix Error message prefix.
+ * @return Result.
+ * @throws IgniteException If failed.
+ */
+ public static HostAndPortRange parse(String addrStr, int dfltPortFrom, int dfltPortTo, String errMsgPrefix)
+ throws IgniteException {
+ assert dfltPortFrom <= dfltPortTo;
+
+ String host;
+
+ String portStr;
+ int portFrom;
+ int portTo;
+
+ if (addrStr == null || addrStr.isEmpty())
+ throw createParseError(addrStr, errMsgPrefix, "Address is empty");
+
+ if (addrStr.charAt(0) == '[') { // IPv6 with port(s)
+ int hostEndIdx = addrStr.indexOf(']');
+
+ if (hostEndIdx == -1)
+ throw createParseError(addrStr, errMsgPrefix, "Failed to parse IPv6 address, missing ']'");
+
+ host = addrStr.substring(1, hostEndIdx);
+
+ if (hostEndIdx == addrStr.length() - 1) { // no port specified, using default
+ portFrom = dfltPortFrom;
+ portTo = dfltPortTo;
+ }
+ else { // port specified
+ portStr = addrStr.substring(hostEndIdx + 2);
+
+ int[] ports = verifyPortStr(addrStr, errMsgPrefix, portStr);
+ portFrom = ports[0];
+ portTo = ports[1];
+ }
+ }
+ else { // IPv4 || IPv6 without port || empty host
+ final int colIdx = addrStr.lastIndexOf(':');
+
+ if (colIdx > 0) {
+ if (addrStr.lastIndexOf(':', colIdx - 1) != -1) { // IPv6 without [] and port
+ try {
+ Inet6Address.getByName(addrStr);
+ host = addrStr;
+ portFrom = dfltPortFrom;
+ portTo = dfltPortTo;
+ }
+ catch (UnknownHostException e) {
+ throw createParseError(addrStr, errMsgPrefix, "IPv6 is incorrect", e);
+ }
+ }
+ else {
+ host = addrStr.substring(0, colIdx);
+ portStr = addrStr.substring(colIdx + 1);
+ int[] ports = verifyPortStr(addrStr, errMsgPrefix, portStr);
+ portFrom = ports[0];
+ portTo = ports[1];
+ }
+ }
+ else if (colIdx == 0)
+ throw createParseError(addrStr, errMsgPrefix, "Host name is empty");
+ else { // Port is not specified, use defaults.
+ host = addrStr;
+
+ portFrom = dfltPortFrom;
+ portTo = dfltPortTo;
+ }
+ }
+
+ return new HostAndPortRange(host, portFrom, portTo);
+ }
+
+ /**
+ * Verifies string containing single port or ports range.
+ *
+ * @param addrStr Address String.
+ * @param errMsgPrefix Error message prefix.
+ * @param portStr Port or port range string.
+ * @return Array of int[portFrom, portTo].
+ * @throws IgniteException If failed.
+ */
+ private static int[] verifyPortStr(String addrStr, String errMsgPrefix, String portStr)
+ throws IgniteException {
+ String portFromStr;
+ String portToStr;
+
+ if (portStr == null || portStr.isEmpty())
+ throw createParseError(addrStr, errMsgPrefix, "port range is not specified");
+
+ int portRangeIdx = portStr.indexOf("..");
+
+ if (portRangeIdx >= 0) {
+ // Port range is specified.
+ portFromStr = portStr.substring(0, portRangeIdx);
+ portToStr = portStr.substring(portRangeIdx + 2);
+ }
+ else {
+ // Single port is specified.
+ portFromStr = portStr;
+ portToStr = portStr;
+ }
+
+ int portFrom = parsePort(portFromStr, addrStr, errMsgPrefix);
+ int portTo = parsePort(portToStr, addrStr, errMsgPrefix);
+
+ if (portFrom > portTo)
+ throw createParseError(addrStr, errMsgPrefix, "start port cannot be less than end port");
+
+ return new int[] {portFrom, portTo};
+ }
+
+ /**
+ * Parse port.
+ *
+ * @param portStr Port string.
+ * @param addrStr Address string.
+ * @param errMsgPrefix Error message prefix.
+ * @return Parsed port.
+ * @throws IgniteException If failed.
+ */
+ private static int parsePort(String portStr, String addrStr, String errMsgPrefix) throws IgniteException {
+ try {
+ int port = Integer.parseInt(portStr);
+
+ if (port <= 0 || port > 65535)
+ throw createParseError(addrStr, errMsgPrefix, "port range contains invalid port " + portStr);
+
+ return port;
+ }
+ catch (NumberFormatException ignored) {
+ throw createParseError(addrStr, errMsgPrefix, "port range contains invalid port " + portStr);
+ }
+ }
+
+ /**
+ * Create parse error.
+ *
+ * @param addrStr Address string.
+ * @param errMsgPrefix Error message prefix.
+ * @param errMsg Error message.
+ * @return Exception.
+ */
+ private static IgniteException createParseError(String addrStr, String errMsgPrefix, String errMsg) {
+ return new IgniteException(errMsgPrefix + " (" + errMsg + "): " + addrStr);
+ }
+
+ /**
+ * Create parse error with cause - nested exception.
+ *
+ * @param addrStr Address string.
+ * @param errMsgPrefix Error message prefix.
+ * @param errMsg Error message.
+ * @param cause Cause exception.
+ * @return Exception.
+ */
+ private static IgniteException createParseError(String addrStr, String errMsgPrefix, String errMsg, Throwable cause) {
+ return new IgniteException(errMsgPrefix + " (" + errMsg + "): " + addrStr, cause);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param host Host.
+ * @param port Port.
+ */
+ public HostAndPortRange(String host, int port) {
+ this(host, port, port);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param host Host.
+ * @param portFrom Port from.
+ * @param portTo Port to.
+ */
+ public HostAndPortRange(String host, int portFrom, int portTo) {
+ assert host != null && !host.isEmpty();
+ assert portFrom <= portTo && portFrom > 0 && portTo < 65535;
+
+ this.host = host;
+ this.portFrom = portFrom;
+ this.portTo = portTo;
+ }
+
+ /**
+ * @return Host.
+ */
+ public String host() {
+ return host;
+ }
+
+ /**
+ * @return Port from.
+ */
+ public int portFrom() {
+ return portFrom;
+ }
+
+ /**
+ * @return Port to.
+ */
+ public int portTo() {
+ return portTo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (o instanceof HostAndPortRange) {
+ HostAndPortRange other = (HostAndPortRange)o;
+
+ return host.equals(other.host) && portFrom == other.portFrom && portTo == other.portTo;
+ }
+ else
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = host.hashCode();
+
+ res = 31 * res + portFrom;
+ res = 31 * res + portTo;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return host + ":" + (portFrom == portTo ? portFrom : portFrom + ".." + portTo);
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
new file mode 100644
index 0000000..d5252fe
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import org.apache.ignite.client.proto.ClientMessageUnpacker;
+
+/**
+ * Thin client payload input channel.
+ */
+public class PayloadInputChannel {
+ /** Client channel. */
+ private final ClientChannel ch;
+
+ /** Input stream. */
+ private final ClientMessageUnpacker in;
+
+ /**
+ * Constructor.
+ *
+ * @param ch Channel.
+ * @param in Unpacker.
+ */
+ PayloadInputChannel(ClientChannel ch, ClientMessageUnpacker in) {
+ this.in = in;
+ this.ch = ch;
+ }
+
+ /**
+ * Gets client channel.
+ *
+ * @return Client channel.
+ */
+ public ClientChannel clientChannel() {
+ return ch;
+ }
+
+ /**
+ * Gets the unpacker.
+ *
+ * @return Unpacker.
+ */
+ public ClientMessageUnpacker in() {
+ return in;
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
new file mode 100644
index 0000000..0db8c1c
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.io.IOException;
+
+import org.apache.ignite.client.proto.ClientMessagePacker;
+
+/**
+ * Thin client payload output channel.
+ */
+public class PayloadOutputChannel implements AutoCloseable {
+ /** Client channel. */
+ private final ClientChannel ch;
+
+ /** Output stream. */
+ private final ClientMessagePacker out;
+
+ /**
+ * Constructor.
+ *
+ * @param ch Channel.
+ */
+ PayloadOutputChannel(ClientChannel ch) {
+ out = new ClientMessagePacker();
+ this.ch = ch;
+ }
+
+ /**
+ * Gets client channel.
+ *
+ * @return Client channel.
+ */
+ public ClientChannel clientChannel() {
+ return ch;
+ }
+
+ /**
+ * Gets the unpacker.
+ *
+ * @return Unpacker.
+ */
+ public ClientMessagePacker out() {
+ return out;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ out.close();
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadReader.java b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadReader.java
new file mode 100644
index 0000000..141f69d
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadReader.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+/**
+ * Payload reader.
+ */
+@FunctionalInterface
+public interface PayloadReader<T> {
+ /**
+ * Reads the payload from the channel.
+ *
+ * @param arg Channel.
+ * @return Payload.
+ * @throws Exception on failure.
+ */
+ T apply(PayloadInputChannel arg) throws Exception;
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadWriter.java b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadWriter.java
new file mode 100644
index 0000000..92e1a72
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadWriter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+/**
+ * Payload writer.
+ */
+@FunctionalInterface
+public interface PayloadWriter {
+ /**
+ * Writes the payload to the channel.
+ *
+ * @param out Channel.
+ * @throws Exception on failure.
+ */
+ void accept(PayloadOutputChannel out) throws Exception;
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolBitmaskFeature.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolBitmaskFeature.java
new file mode 100644
index 0000000..d26d96c
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolBitmaskFeature.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.EnumSet;
+
+/**
+ * Defines supported bitmask features for thin client.
+ */
+public enum ProtocolBitmaskFeature {
+ /** Feature for user attributes. */
+ USER_ATTRIBUTES(0);
+
+ /** */
+ private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
+ EnumSet.allOf(ProtocolBitmaskFeature.class);
+
+ /** Feature id. */
+ private final int featureId;
+
+ /**
+ * @param id Feature ID.
+ */
+ ProtocolBitmaskFeature(int id) {
+ featureId = id;
+ }
+
+ /**
+ * @return Feature ID.
+ */
+ public int featureId() {
+ return featureId;
+ }
+
+ /**
+ * @param bytes Feature byte array.
+ * @return Set of supported features.
+ */
+ public static EnumSet<ProtocolBitmaskFeature> enumSet(byte[] bytes) {
+ EnumSet<ProtocolBitmaskFeature> set = EnumSet.noneOf(ProtocolBitmaskFeature.class);
+
+ if (bytes == null)
+ return set;
+
+ final BitSet bSet = BitSet.valueOf(bytes);
+
+ for (ProtocolBitmaskFeature e : ProtocolBitmaskFeature.values()) {
+ if (bSet.get(e.featureId()))
+ set.add(e);
+ }
+
+ return set;
+ }
+
+ /**
+ * @param features Feature set.
+ * @return Byte array representing all supported features.
+ */
+ static byte[] featuresAsBytes(Collection<ProtocolBitmaskFeature> features) {
+ final BitSet set = new BitSet();
+
+ for (ProtocolBitmaskFeature f : features)
+ set.set(f.featureId());
+
+ return set.toByteArray();
+ }
+
+ /**
+ * @return All features as a set.
+ */
+ public static EnumSet<ProtocolBitmaskFeature> allFeaturesAsEnumSet() {
+ return ALL_FEATURES_AS_ENUM_SET.clone();
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
new file mode 100644
index 0000000..62e5b07
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.EnumSet;
+
+import org.apache.ignite.client.IgniteClientFeatureNotSupportedByServerException;
+import org.apache.ignite.client.proto.ProtocolVersion;
+
+/**
+ * Protocol Context.
+ */
+public class ProtocolContext {
+ /** Protocol version. */
+ private final ProtocolVersion ver;
+
+ /** Features. */
+ private final EnumSet<ProtocolBitmaskFeature> features;
+
+ /**
+ * @param ver Protocol version.
+ * @param features Supported features.
+ */
+ public ProtocolContext(ProtocolVersion ver, EnumSet<ProtocolBitmaskFeature> features) {
+ this.ver = ver;
+ this.features = features != null ? features : EnumSet.noneOf(ProtocolBitmaskFeature.class);
+ }
+
+ /**
+ * Gets a value indicating whether a feature is supported.
+ *
+ * @param feature Feature.
+ * @return {@code true} if bitmask protocol feature supported.
+ */
+ public boolean isFeatureSupported(ProtocolBitmaskFeature feature) {
+ return features.contains(feature);
+ }
+
+ /**
+ * Check that feature is supported by the server.
+ *
+ * @param feature Feature.
+ * @throws IgniteClientFeatureNotSupportedByServerException If feature is not supported by the server.
+ */
+ public void checkFeatureSupported(ProtocolBitmaskFeature feature) throws IgniteClientFeatureNotSupportedByServerException {
+ if (!isFeatureSupported(feature))
+ throw new IgniteClientFeatureNotSupportedByServerException(feature);
+ }
+
+ /**
+ * @return Supported features.
+ */
+ public EnumSet<ProtocolBitmaskFeature> features() {
+ return features;
+ }
+
+ /**
+ * @return Protocol version.
+ */
+ public ProtocolVersion version() {
+ return ver;
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
new file mode 100644
index 0000000..f6c60f1
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.ignite.client.IgniteClientAuthenticationException;
+import org.apache.ignite.client.IgniteClientConfiguration;
+import org.apache.ignite.client.IgniteClientConnectionException;
+import org.apache.ignite.client.IgniteClientException;
+import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.io.netty.NettyClientConnectionMultiplexer;
+
+/**
+ * Communication channel with failover and partition awareness.
+ */
+public final class ReliableChannel implements AutoCloseable {
+ /** Do nothing helper function. */
+ private static final Consumer<Integer> DO_NOTHING = (v) -> {};
+
+ /** Channel factory. */
+ private final BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory;
+
+ /** Client channel holders for each configured address. */
+ private volatile List<ClientChannelHolder> channels;
+
+ /** Index of the current channel. */
+ private volatile int curChIdx = -1;
+
+ /** Client configuration. */
+ private final IgniteClientConfiguration clientCfg;
+
+ /** Node channels. */
+ private final Map<UUID, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>();
+
+ /** Channels reinit was scheduled. */
+ private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
+
+ /** Channel is closed. */
+ private volatile boolean closed;
+
+ /** Fail (disconnect) listeners. */
+ private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+
+ /** Guard channels and curChIdx together. */
+ private final ReadWriteLock curChannelsGuard = new ReentrantReadWriteLock();
+
+ /** Connection manager. */
+ private final ClientConnectionMultiplexer connMgr;
+
+ /** Cache addresses returned by {@code ThinClientAddressFinder}. */
+ private volatile String[] prevHostAddrs;
+
+ /**
+ * Constructor.
+ *
+ * @param chFactory Channel factory.
+ * @param clientCfg Client config.
+ */
+ ReliableChannel(BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
+ IgniteClientConfiguration clientCfg) {
+ if (chFactory == null)
+ throw new NullPointerException("chFactory");
+
+ if (clientCfg == null)
+ throw new NullPointerException("clientCfg");
+
+ this.clientCfg = clientCfg;
+ this.chFactory = chFactory;
+
+ connMgr = new NettyClientConnectionMultiplexer();
+ connMgr.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void close() {
+ closed = true;
+
+ connMgr.stop();
+
+ List<ClientChannelHolder> holders = channels;
+
+ if (holders != null) {
+ for (ClientChannelHolder hld: holders)
+ hld.close();
+ }
+ }
+
+ /**
+ * Sends request and handles response asynchronously.
+ *
+ * @param opCode Operation code.
+ * @param payloadWriter Payload writer.
+ * @param payloadReader Payload reader.
+ * @param <T> response type.
+ * @return Future for the operation.
+ */
+ public <T> CompletableFuture<T> serviceAsync(
+ int opCode,
+ PayloadWriter payloadWriter,
+ PayloadReader<T> payloadReader
+ ) {
+ CompletableFuture<T> fut = new CompletableFuture<>();
+
+ // Use the only one attempt to avoid blocking async method.
+ handleServiceAsync(fut, opCode, payloadWriter, payloadReader, 1, null);
+
+ return fut;
+ }
+
+ private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+ int opCode,
+ PayloadWriter payloadWriter,
+ PayloadReader<T> payloadReader,
+ int attemptsLimit,
+ IgniteClientConnectionException failure) {
+ ClientChannel ch;
+ // Workaround to store used attempts value within lambda body.
+ var attemptsCnt = new int[1];
+
+ try {
+ ch = applyOnDefaultChannel(channel -> channel, attemptsLimit, v -> attemptsCnt[0] = v );
+ }
+ catch (Throwable ex) {
+ if (failure != null) {
+ failure.addSuppressed(ex);
+
+ fut.completeExceptionally(failure);
+
+ return;
+ }
+
+ fut.completeExceptionally(ex);
+
+ return;
+ }
+
+ ch
+ .serviceAsync(opCode, payloadWriter, payloadReader)
+ .handle((res, err) -> {
+ if (err == null) {
+ fut.complete(res);
+
+ return null;
+ }
+
+ IgniteClientConnectionException failure0 = failure;
+
+ if (err instanceof IgniteClientConnectionException) {
+ try {
+ // Will try to reinit channels if topology changed.
+ onChannelFailure(ch);
+ }
+ catch (Throwable ex) {
+ fut.completeExceptionally(ex);
+
+ return null;
+ }
+
+ if (failure0 == null)
+ failure0 = (IgniteClientConnectionException)err;
+ else
+ failure0.addSuppressed(err);
+
+ int leftAttempts = attemptsLimit - attemptsCnt[0];
+
+ // If it is a first retry then reset attempts (as for initialization we use only 1 attempt).
+ if (failure == null)
+ leftAttempts = getRetryLimit() - 1;
+
+ if (leftAttempts > 0) {
+ handleServiceAsync(fut, opCode, payloadWriter, payloadReader, leftAttempts, failure0);
+
+ return null;
+ }
+ }
+ else {
+ fut.completeExceptionally(err instanceof IgniteClientException
+ ? err
+ : new IgniteClientException(err.getMessage(), err));
+
+ return null;
+ }
+
+ fut.completeExceptionally(failure0);
+
+ return null;
+ });
+ }
+
+ /**
+ * Sends request without payload and handles response asynchronously.
+ *
+ * @param opCode Operation code.
+ * @param payloadReader Payload reader.
+ * @param <T> Response type.
+ * @return Future for the operation.
+ */
+ public <T> CompletableFuture<T> serviceAsync(int opCode, PayloadReader<T> payloadReader) {
+ return serviceAsync(opCode, null, payloadReader);
+ }
+
+ /**
+ * Sends request with payload and handles response asynchronously.
+ * @param opCode Operation code.
+ * @param payloadWriter Payload writer.
+ * @return Future for the operation.
+ */
+ public CompletableFuture<Void> requestAsync(int opCode, PayloadWriter payloadWriter) {
+ return serviceAsync(opCode, payloadWriter, null);
+ }
+
+ /**
+ * @return host:port_range address lines parsed as {@link InetSocketAddress} as a key. Value is the amount of
+ * appearences of an address in {@code addrs} parameter.
+ */
+ private static Map<InetSocketAddress, Integer> parsedAddresses(String[] addrs) throws IgniteClientException {
+ if (addrs == null || addrs.length == 0)
+ throw new IgniteClientException("Empty addresses");
+
+ Collection<HostAndPortRange> ranges = new ArrayList<>(addrs.length);
+
+ for (String a : addrs) {
+ ranges.add(HostAndPortRange.parse(
+ a,
+ IgniteClientConfiguration.DFLT_PORT,
+ IgniteClientConfiguration.DFLT_PORT + IgniteClientConfiguration.DFLT_PORT_RANGE,
+ "Failed to parse Ignite server address"
+ ));
+ }
+
+ return ranges.stream()
+ .flatMap(r -> IntStream
+ .rangeClosed(r.portFrom(), r.portTo()).boxed()
+ .map(p -> InetSocketAddress.createUnresolved(r.host(), p))
+ )
+ .collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
+ }
+
+ /**
+ * Roll current default channel if specified holder equals to it.
+ */
+ private void rollCurrentChannel(ClientChannelHolder hld) {
+ curChannelsGuard.writeLock().lock();
+
+ try {
+ int idx = curChIdx;
+ List<ClientChannelHolder> holders = channels;
+
+ ClientChannelHolder dfltHld = holders.get(idx);
+
+ if (dfltHld == hld) {
+ idx += 1;
+
+ if (idx >= holders.size())
+ curChIdx = 0;
+ else
+ curChIdx = idx;
+ }
+ }
+ finally {
+ curChannelsGuard.writeLock().unlock();
+ }
+ }
+
+ /**
+ * On current channel failure.
+ */
+ private void onChannelFailure(ClientChannel ch) {
+ // There is nothing wrong if curChIdx was concurrently changed, since channel was closed by another thread
+ // when current index was changed and no other wrong channel will be closed by current thread because
+ // onChannelFailure checks channel binded to the holder before closing it.
+ onChannelFailure(channels.get(curChIdx), ch);
+ }
+
+ /**
+ * On channel of the specified holder failure.
+ */
+ private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
+ if (ch != null && ch == hld.ch)
+ hld.closeChannel();
+
+ chFailLsnrs.forEach(Runnable::run);
+
+ // Roll current channel even if a topology changes. To help find working channel faster.
+ rollCurrentChannel(hld);
+
+ if (scheduledChannelsReinit.get())
+ channelsInit();
+ }
+
+ /**
+ * @param chFailLsnr Listener for the channel fail (disconnect).
+ */
+ public void addChannelFailListener(Runnable chFailLsnr) {
+ chFailLsnrs.add(chFailLsnr);
+ }
+
+ /**
+ * Should the channel initialization be stopped.
+ */
+ private boolean shouldStopChannelsReinit() {
+ return scheduledChannelsReinit.get() || closed;
+ }
+
+ /**
+ * Init channel holders to all nodes.
+ * @return boolean wheter channels was reinited.
+ */
+ synchronized boolean initChannelHolders() {
+ List<ClientChannelHolder> holders = channels;
+
+ // Enable parallel threads to schedule new init of channel holders.
+ scheduledChannelsReinit.set(false);
+
+ Map<InetSocketAddress, Integer> newAddrs = null;
+
+ if (clientCfg.getAddressesFinder() != null) {
+ String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
+
+ if (hostAddrs.length == 0)
+ throw new IgniteClientException("Empty addresses");
+
+ if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+ newAddrs = parsedAddresses(hostAddrs);
+ prevHostAddrs = hostAddrs;
+ }
+ }
+ else if (holders == null)
+ newAddrs = parsedAddresses(clientCfg.getAddresses());
+
+ if (newAddrs == null)
+ return true;
+
+ Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
+ Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
+
+ if (holders != null) {
+ for (int i = 0; i < holders.size(); i++) {
+ ClientChannelHolder h = holders.get(i);
+
+ curAddrs.put(h.chCfg.getAddress(), h);
+ allAddrs.add(h.chCfg.getAddress());
+ }
+ }
+
+ List<ClientChannelHolder> reinitHolders = new ArrayList<>();
+
+ // The variable holds a new index of default channel after topology change.
+ // Suppose that reuse of the channel is better than open new connection.
+ int dfltChannelIdx = -1;
+
+ ClientChannelHolder currDfltHolder = null;
+
+ int idx = curChIdx;
+
+ if (idx != -1)
+ currDfltHolder = holders.get(idx);
+
+ for (InetSocketAddress addr : allAddrs) {
+ if (shouldStopChannelsReinit())
+ return false;
+
+ // Obsolete addr, to be removed.
+ if (!newAddrs.containsKey(addr)) {
+ curAddrs.get(addr).close();
+
+ continue;
+ }
+
+ // Create new holders for new addrs.
+ if (!curAddrs.containsKey(addr)) {
+ ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr));
+
+ for (int i = 0; i < newAddrs.get(addr); i++)
+ reinitHolders.add(hld);
+
+ continue;
+ }
+
+ // This holder is up to date.
+ ClientChannelHolder hld = curAddrs.get(addr);
+
+ for (int i = 0; i < newAddrs.get(addr); i++)
+ reinitHolders.add(hld);
+
+ if (hld == currDfltHolder)
+ dfltChannelIdx = reinitHolders.size() - 1;
+ }
+
+ if (dfltChannelIdx == -1)
+ dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+
+ curChannelsGuard.writeLock().lock();
+
+ try {
+ channels = reinitHolders;
+ curChIdx = dfltChannelIdx;
+ }
+ finally {
+ curChannelsGuard.writeLock().unlock();
+ }
+
+ return true;
+ }
+
+ /**
+ * Establishing connections to servers. If partition awareness feature is enabled connections are created
+ * for every configured server. Otherwise only default channel is connected.
+ */
+ void channelsInit() {
+ // Do not establish connections if interrupted.
+ if (!initChannelHolders())
+ return;
+
+ // Apply no-op function. Establish default channel connection.
+ applyOnDefaultChannel(channel -> null);
+ }
+
+ /** */
+ private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function) {
+ return applyOnDefaultChannel(function, getRetryLimit(), DO_NOTHING);
+ }
+
+ /**
+ * Apply specified {@code function} on any of available channel.
+ */
+ private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function,
+ int attemptsLimit,
+ Consumer<Integer> attemptsCallback) {
+ Throwable failure = null;
+
+ for (int attempt = 0; attempt < attemptsLimit; attempt++) {
+ ClientChannelHolder hld = null;
+ ClientChannel c = null;
+
+ try {
+ if (closed)
+ throw new IgniteClientException("Channel is closed");
+
+ curChannelsGuard.readLock().lock();
+
+ try {
+ hld = channels.get(curChIdx);
+ }
+ finally {
+ curChannelsGuard.readLock().unlock();
+ }
+
+ c = hld.getOrCreateChannel();
+
+ if (c != null) {
+ attemptsCallback.accept(attempt + 1);
+
+ return function.apply(c);
+ }
+ }
+ catch (Throwable e) {
+ if (failure == null)
+ failure = e;
+ else
+ failure.addSuppressed(e);
+
+ onChannelFailure(hld, c);
+ }
+ }
+
+ throw new IgniteClientConnectionException("Failed to connect", failure);
+ }
+
+ /** Get retry limit. */
+ private int getRetryLimit() {
+ List<ClientChannelHolder> holders = channels;
+
+ if (holders == null)
+ throw new IgniteClientException("Connections to nodes aren't initialized.");
+
+ int size = holders.size();
+
+ return clientCfg.getRetryLimit() > 0 ? Math.min(clientCfg.getRetryLimit(), size) : size;
+ }
+
+ /**
+ * Channels holder.
+ */
+ @SuppressWarnings("PackageVisibleInnerClass") // Visible for tests.
+ class ClientChannelHolder {
+ /** Channel configuration. */
+ private final ClientChannelConfiguration chCfg;
+
+ /** Channel. */
+ private volatile ClientChannel ch;
+
+ /** ID of the last server node that {@link ch} is or was connected to. */
+ private volatile UUID serverNodeId;
+
+ /** Address that holder is bind to (chCfg.addr) is not in use now. So close the holder. */
+ private volatile boolean close;
+
+ /** Timestamps of reconnect retries. */
+ private final long[] reconnectRetries;
+
+ /**
+ * @param chCfg Channel config.
+ */
+ private ClientChannelHolder(ClientChannelConfiguration chCfg) {
+ this.chCfg = chCfg;
+
+ reconnectRetries = chCfg.getReconnectThrottlingRetries() > 0 && chCfg.getReconnectThrottlingPeriod() > 0L ?
+ new long[chCfg.getReconnectThrottlingRetries()] : null;
+ }
+
+ /**
+ * @return Whether reconnect throttling should be applied.
+ */
+ private boolean applyReconnectionThrottling() {
+ if (reconnectRetries == null)
+ return false;
+
+ long ts = System.currentTimeMillis();
+
+ for (int i = 0; i < reconnectRetries.length; i++) {
+ if (ts - reconnectRetries[i] >= chCfg.getReconnectThrottlingPeriod()) {
+ reconnectRetries[i] = ts;
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Get or create channel.
+ */
+ private ClientChannel getOrCreateChannel()
+ throws IgniteClientConnectionException, IgniteClientAuthenticationException {
+ return getOrCreateChannel(false);
+ }
+
+ /**
+ * Get or create channel.
+ */
+ private ClientChannel getOrCreateChannel(boolean ignoreThrottling)
+ throws IgniteClientConnectionException, IgniteClientAuthenticationException {
+ if (ch == null && !close) {
+ synchronized (this) {
+ if (close)
+ return null;
+
+ if (ch != null)
+ return ch;
+
+ if (!ignoreThrottling && applyReconnectionThrottling())
+ throw new IgniteClientConnectionException("Reconnect is not allowed due to applied throttling");
+
+ ch = chFactory.apply(chCfg, connMgr);
+ }
+ }
+
+ return ch;
+ }
+
+ /**
+ * Close channel.
+ */
+ private synchronized void closeChannel() {
+ if (ch != null) {
+ try {
+ ch.close();
+ }
+ catch (Exception ignored) {
+ }
+
+ ch = null;
+ }
+ }
+
+ /**
+ * Close holder.
+ */
+ void close() {
+ close = true;
+
+ if (serverNodeId != null)
+ nodeChannels.remove(serverNodeId, this);
+
+ closeChannel();
+ }
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
new file mode 100644
index 0000000..50b7ede
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.channel.ChannelFuture;
+import org.apache.ignite.client.IgniteClientAuthenticationException;
+import org.apache.ignite.client.IgniteClientAuthorizationException;
+import org.apache.ignite.client.IgniteClientConnectionException;
+import org.apache.ignite.client.IgniteClientException;
+import org.apache.ignite.client.proto.ClientErrorCode;
+import org.apache.ignite.client.proto.ClientMessagePacker;
+import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.client.proto.ProtocolVersion;
+import org.apache.ignite.client.proto.ServerMessageType;
+import org.apache.ignite.internal.client.io.ClientConnection;
+import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.io.ClientMessageHandler;
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
+import org.msgpack.core.buffer.ByteBufferInput;
+
+/**
+ * Implements {@link ClientChannel} over TCP.
+ */
+class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientConnectionStateHandler {
+ /** Protocol version used by default on first connection attempt. */
+ private static final ProtocolVersion DEFAULT_VERSION = ProtocolVersion.LATEST_VER;
+
+ /** Supported protocol versions. */
+ private static final Collection<ProtocolVersion> supportedVers = Collections.singletonList(
+ ProtocolVersion.V3_0_0
+ );
+
+ /** Protocol context. */
+ private volatile ProtocolContext protocolCtx;
+
+ /** Channel. */
+ private final ClientConnection sock;
+
+ /** Request id. */
+ private final AtomicLong reqId = new AtomicLong(1);
+
+ /** Pending requests. */
+ private final Map<Long, ClientRequestFuture> pendingReqs = new ConcurrentHashMap<>();
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ /** Executor for async operation listeners. */
+ private final Executor asyncContinuationExecutor;
+
+ /** Send/receive timeout in milliseconds. */
+ private final int timeout;
+
+ /**
+ * Constructor.
+ *
+ * @param cfg Config.
+ * @param connMgr Connection multiplexer.
+ */
+ TcpClientChannel(ClientChannelConfiguration cfg, ClientConnectionMultiplexer connMgr) {
+ validateConfiguration(cfg);
+
+ asyncContinuationExecutor = ForkJoinPool.commonPool();
+
+ timeout = cfg.getTimeout();
+
+ sock = connMgr.open(cfg.getAddress(), this, this);
+
+ handshake(DEFAULT_VERSION);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ close(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(ByteBuffer buf) throws IOException {
+ processNextMessage(buf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(@Nullable Exception e) {
+ close(e);
+ }
+
+ /**
+ * Close the channel with cause.
+ */
+ private void close(Exception cause) {
+ if (closed.compareAndSet(false, true)) {
+ sock.close();
+
+ for (ClientRequestFuture pendingReq : pendingReqs.values())
+ pendingReq.completeExceptionally(new IgniteClientConnectionException("Channel is closed", cause));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> CompletableFuture<T> serviceAsync(
+ int opCode,
+ PayloadWriter payloadWriter,
+ PayloadReader<T> payloadReader
+ ) {
+ try {
+ ClientRequestFuture fut = send(opCode, payloadWriter);
+
+ return receiveAsync(fut, payloadReader);
+ }
+ catch (Throwable t) {
+ CompletableFuture<T> fut = new CompletableFuture<>();
+ fut.completeExceptionally(t);
+
+ return fut;
+ }
+ }
+
+ /**
+ * @param opCode Operation code.
+ * @param payloadWriter Payload writer to stream or {@code null} if request has no payload.
+ * @return Request future.
+ */
+ private ClientRequestFuture send(int opCode, PayloadWriter payloadWriter)
+ throws IgniteClientException {
+ long id = reqId.getAndIncrement();
+
+ try (PayloadOutputChannel payloadCh = new PayloadOutputChannel(this)) {
+ if (closed())
+ throw new IgniteClientConnectionException("Channel is closed");
+
+ ClientRequestFuture fut = new ClientRequestFuture();
+
+ pendingReqs.put(id, fut);
+
+ var req = payloadCh.out();
+
+ req.packInt(opCode);
+ req.packLong(id);
+
+ if (payloadWriter != null)
+ payloadWriter.accept(payloadCh);
+
+ write(req).addListener(f -> {
+ if (!f.isSuccess())
+ fut.completeExceptionally(new IgniteClientConnectionException("Failed to send request", f.cause()));
+ });
+
+ return fut;
+ }
+ catch (Throwable t) {
+ pendingReqs.remove(id);
+
+ throw convertException(t);
+ }
+ }
+
+ /**
+ * Receives the response asynchronously.
+ *
+ * @param pendingReq Request future.
+ * @param payloadReader Payload reader from stream.
+ * @return Future for the operation.
+ */
+ private <T> CompletableFuture<T> receiveAsync(ClientRequestFuture pendingReq, PayloadReader<T> payloadReader) {
+ return pendingReq.thenApplyAsync(payload -> {
+ if (payload == null || payloadReader == null)
+ return null;
+
+ try {
+ return payloadReader.apply(new PayloadInputChannel(this, payload));
+ } catch (Exception e) {
+ throw new IgniteException("Failed to deserialize server response: " + e.getMessage(), e);
+ }
+ }, asyncContinuationExecutor);
+ }
+
+ /**
+ * Converts exception to {@link IgniteClientException}.
+ * @param e Exception to convert.
+ * @return Resulting exception.
+ */
+ private IgniteClientException convertException(Throwable e) {
+ // For every known class derived from IgniteClientException, wrap cause in a new instance.
+ // We could rethrow e.getCause() when instanceof IgniteClientException,
+ // but this results in an incomplete stack trace from the receiver thread.
+ // This is similar to IgniteUtils.exceptionConverters.
+ if (e.getCause() instanceof IgniteClientConnectionException)
+ return new IgniteClientConnectionException(e.getMessage(), e.getCause());
+
+ if (e.getCause() instanceof IgniteClientAuthorizationException)
+ return new IgniteClientAuthorizationException(e.getMessage(), e.getCause());
+
+ return new IgniteClientException(e.getMessage(), ClientErrorCode.FAILED, e);
+ }
+
+ /**
+ * Process next message from the input stream and complete corresponding future.
+ */
+ private void processNextMessage(ByteBuffer buf) throws IgniteClientException, IOException {
+ var unpacker = new ClientMessageUnpacker(new ByteBufferInput(buf));
+
+ if (protocolCtx == null) {
+ // Process handshake.
+ pendingReqs.remove(-1L).complete(unpacker);
+ return;
+ }
+
+ var type = unpacker.unpackInt();
+
+ if (type != ServerMessageType.RESPONSE)
+ throw new IgniteClientException("Unexpected message type: " + type);
+
+ Long resId = unpacker.unpackLong();
+
+ int status = unpacker.unpackInt();
+
+ ClientRequestFuture pendingReq = pendingReqs.remove(resId);
+
+ if (pendingReq == null)
+ throw new IgniteClientException(String.format("Unexpected response ID [%s]", resId));
+
+ if (status == 0) {
+ pendingReq.complete(unpacker);
+ } else {
+ var errMsg = unpacker.unpackString();
+ var err = new IgniteClientException(errMsg, status);
+ pendingReq.completeExceptionally(err);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean closed() {
+ return closed.get();
+ }
+
+ private static void validateConfiguration(ClientChannelConfiguration cfg) {
+ String error = null;
+
+ InetSocketAddress addr = cfg.getAddress();
+
+ if (addr == null)
+ error = "At least one Ignite server node must be specified in the Ignite client configuration";
+ else if (addr.getPort() < 1024 || addr.getPort() > 49151)
+ error = String.format("Ignite client port %s is out of valid ports range 1024...49151", addr.getPort());
+
+ if (error != null)
+ throw new IllegalArgumentException(error);
+ }
+
+ /** Client handshake. */
+ private void handshake(ProtocolVersion ver)
+ throws IgniteClientConnectionException {
+ ClientRequestFuture fut = new ClientRequestFuture();
+ pendingReqs.put(-1L, fut);
+
+ try {
+ handshakeReq(ver);
+
+ var res = timeout > 0 ? fut.get(timeout, TimeUnit.MILLISECONDS) : fut.get();
+ handshakeRes(res, ver);
+ }
+ catch (Throwable e) {
+ throw convertException(e);
+ }
+ }
+
+ /** Send handshake request. */
+ private void handshakeReq(ProtocolVersion proposedVer) throws IOException {
+ try (var req = new ClientMessagePacker()) {
+ req.packInt(proposedVer.major());
+ req.packInt(proposedVer.minor());
+ req.packInt(proposedVer.patch());
+
+ req.packInt(2); // Client type: general purpose.
+
+ req.packBinaryHeader(0); // Features.
+ req.packMapHeader(0); // Extensions.
+
+ write(req).syncUninterruptibly();
+ }
+ }
+
+ /**
+ * @param ver Protocol version.
+ * @return Protocol context for a version.
+ */
+ private ProtocolContext protocolContextFromVersion(ProtocolVersion ver) {
+ return new ProtocolContext(ver, ProtocolBitmaskFeature.allFeaturesAsEnumSet());
+ }
+
+ /** Receive and handle handshake response. */
+ private void handshakeRes(ClientMessageUnpacker unpacker, ProtocolVersion proposedVer)
+ throws IgniteClientConnectionException, IgniteClientAuthenticationException {
+ try {
+ ProtocolVersion srvVer = new ProtocolVersion(unpacker.unpackShort(), unpacker.unpackShort(),
+ unpacker.unpackShort());
+
+ var errCode = unpacker.unpackInt();
+
+ if (errCode != ClientErrorCode.SUCCESS) {
+ var msg = unpacker.unpackString();
+
+ if (errCode == ClientErrorCode.AUTH_FAILED)
+ throw new IgniteClientAuthenticationException(msg);
+ else if (proposedVer.equals(srvVer))
+ throw new IgniteClientException("Client protocol error: unexpected server response.");
+ else if (!supportedVers.contains(srvVer))
+ throw new IgniteClientException(String.format(
+ "Protocol version mismatch: client %s / server %s. Server details: %s",
+ proposedVer,
+ srvVer,
+ msg
+ ));
+ else { // Retry with server version.
+ handshake(srvVer);
+ }
+
+ throw new IgniteClientConnectionException(msg);
+ }
+
+ var featuresLen = unpacker.unpackBinaryHeader();
+ unpacker.skipValue(featuresLen);
+
+ var extensionsLen = unpacker.unpackMapHeader();
+ unpacker.skipValue(extensionsLen);
+
+ protocolCtx = protocolContextFromVersion(srvVer);
+ } catch (IOException e) {
+ throw handleIOError(e);
+ }
+ }
+
+ /** Write bytes to the output stream. */
+ private ChannelFuture write(ClientMessagePacker packer) throws IgniteClientConnectionException {
+ var buf = packer.toMessageBuffer().sliceAsByteBuffer();
+
+ return sock.send(buf);
+ }
+
+ /**
+ * @param ex IO exception (cause).
+ */
+ private IgniteClientException handleIOError(@Nullable IOException ex) {
+ return handleIOError("sock=" + sock, ex);
+ }
+
+ /**
+ * @param chInfo Additional channel info
+ * @param ex IO exception (cause).
+ */
+ private IgniteClientException handleIOError(String chInfo, @Nullable IOException ex) {
+ return new IgniteClientConnectionException("Ignite cluster is unavailable [" + chInfo + ']', ex);
+ }
+
+ /**
+ *
+ */
+ private static class ClientRequestFuture extends CompletableFuture<ClientMessageUnpacker> {
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
new file mode 100644
index 0000000..36a9c5d
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.function.BiFunction;
+
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.IgniteClientConfiguration;
+import org.apache.ignite.client.IgniteClientException;
+import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.table.ClientTables;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.apache.ignite.tx.IgniteTransactions;
+
+/**
+ * Implementation of {@link IgniteClient} over TCP protocol.
+ */
+public class TcpIgniteClient implements Ignite {
+ /** Channel. */
+ private final ReliableChannel ch;
+
+ /** Tables. */
+ private final ClientTables tables;
+
+ /**
+ * Constructor.
+ *
+ * @param cfg Config.
+ */
+ public TcpIgniteClient(IgniteClientConfiguration cfg) {
+ this(TcpClientChannel::new, cfg);
+ }
+
+ /**
+ * Constructor with custom channel factory.
+ *
+ * @param chFactory Channel factory.
+ * @param cfg Config.
+ */
+ public TcpIgniteClient(BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
+ IgniteClientConfiguration cfg) {
+ ch = new ReliableChannel(chFactory, cfg);
+
+ try {
+ // TODO: Async init.
+ ch.channelsInit();
+ }
+ catch (Exception e) {
+ ch.close();
+ throw e;
+ }
+
+ tables = new ClientTables(ch);
+ }
+
+ /**
+ * Initializes new instance of {@link IgniteClient}.
+ *
+ * @param cfg Thin client configuration.
+ * @return Client with successfully opened thin client connection.
+ */
+ public static Ignite start(IgniteClientConfiguration cfg) throws IgniteClientException {
+ return new TcpIgniteClient(cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTables tables() {
+ return tables;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTransactions transactions() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ ch.close();
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
new file mode 100644
index 0000000..8e25329
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.io;
+
+import java.nio.ByteBuffer;
+
+import io.netty.channel.ChannelFuture;
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Client connection: abstracts away sending and receiving messages.
+ */
+public interface ClientConnection extends AutoCloseable {
+ /**
+ * Sends a message.
+ *
+ * @param msg Message buffer.
+ * @return Future for the operation.
+ */
+ ChannelFuture send(ByteBuffer msg) throws IgniteException;
+
+ /**
+ * Closes the connection.
+ */
+ @Override void close();
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionMultiplexer.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionMultiplexer.java
new file mode 100644
index 0000000..af830f1
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionMultiplexer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.io;
+
+import java.net.InetSocketAddress;
+
+import org.apache.ignite.client.IgniteClientConnectionException;
+
+/**
+ * Client connection multiplexer: manages multiple connections with a shared resource pool (worker threads, etc).
+ */
+ public interface ClientConnectionMultiplexer { // TODO: Async methods where possible.
+ /**
+ * Initializes this instance.
+ */
+ void start();
+
+ /**
+ * Stops this instance.
+ */
+ void stop();
+
+ /**
+ * Opens a new connection.
+ *
+ * @param addr Address.
+ * @param msgHnd Incoming message handler.
+ * @param stateHnd Connection state handler.
+ * @return Created connection.
+ * @throws IgniteClientConnectionException when connection can't be established.
+ */
+ ClientConnection open(
+ InetSocketAddress addr,
+ ClientMessageHandler msgHnd,
+ ClientConnectionStateHandler stateHnd)
+ throws IgniteClientConnectionException;
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionStateHandler.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionStateHandler.java
new file mode 100644
index 0000000..c9ac00f
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionStateHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.io;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles thin client connection state.
+ */
+public interface ClientConnectionStateHandler {
+ /**
+ * Handles connection loss.
+ *
+ * @param e Exception that caused the disconnect, can be null.
+ */
+ void onDisconnected(@Nullable Exception e);
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
new file mode 100644
index 0000000..da437a3
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Handles thin client responses and server to client notifications.
+ */
+public interface ClientMessageHandler {
+ /**
+ * Handles messages from the server.
+ *
+ * @param buf Buffer.
+ * @throws IOException on failure.
+ */
+ void onMessage(ByteBuffer buf) throws IOException;
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
new file mode 100644
index 0000000..f5955aa
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.io.netty;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.AttributeKey;
+import org.apache.ignite.internal.client.io.ClientConnection;
+import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.io.ClientMessageHandler;
+import org.apache.ignite.lang.IgniteException;
+
+/**
+ * Netty client connection.
+ */
+public class NettyClientConnection implements ClientConnection {
+ /** Connection attribute */
+ public static final AttributeKey<NettyClientConnection> ATTR_CONN = AttributeKey.newInstance("CONN");
+
+ /** Channel. */
+ private final Channel channel;
+
+ /** Message handler. */
+ private final ClientMessageHandler msgHnd;
+
+ /** State handler. */
+ private final ClientConnectionStateHandler stateHnd;
+
+ /**
+ * Constructor.
+ *
+ * @param channel Channel.
+ * @param msgHnd Message handler.
+ * @param stateHnd State handler.
+ */
+ public NettyClientConnection(Channel channel, ClientMessageHandler msgHnd, ClientConnectionStateHandler stateHnd) {
+ this.channel = channel;
+ this.msgHnd = msgHnd;
+ this.stateHnd = stateHnd;
+
+ channel.attr(ATTR_CONN).set(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ChannelFuture send(ByteBuffer msg) throws IgniteException {
+ return channel.writeAndFlush(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ channel.close();
+ }
+
+ /**
+ * Handles incoming message.
+ *
+ * @param buf Message.
+ * @throws IOException when message can't be decoded.
+ */
+ void onMessage(ByteBuffer buf) throws IOException {
+ msgHnd.onMessage(buf);
+ }
+
+ /**
+ * Handles disconnect.
+ *
+ * @param e Exception that caused the disconnect.
+ */
+ void onDisconnected(Exception e) {
+ stateHnd.onDisconnected(e);
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
new file mode 100644
index 0000000..c9cc4c8
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.io.netty;
+
+import java.net.InetSocketAddress;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.ignite.client.IgniteClientConnectionException;
+import org.apache.ignite.client.proto.ClientMessageDecoder;
+import org.apache.ignite.client.proto.ClientMessageEncoder;
+import org.apache.ignite.internal.client.io.ClientConnection;
+import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.io.ClientMessageHandler;
+
+/**
+ * Netty-based multiplexer.
+ */
+public class NettyClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+ /** */
+ private final NioEventLoopGroup workerGroup;
+
+ /** */
+ private final Bootstrap bootstrap;
+
+ /**
+ * Constructor.
+ */
+ public NettyClientConnectionMultiplexer() {
+ workerGroup = new NioEventLoopGroup();
+ bootstrap = new Bootstrap();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() {
+ // TODO: Is this method needed?
+ try {
+ bootstrap.group(workerGroup);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch)
+ throws Exception {
+ ch.pipeline().addLast(
+ new ClientMessageDecoder(),
+ new ClientMessageEncoder(),
+ new NettyClientMessageHandler());
+ }
+ });
+
+ } catch (Throwable t) {
+ workerGroup.shutdownGracefully();
+
+ throw t;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ workerGroup.shutdownGracefully();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientConnection open(InetSocketAddress addr,
+ ClientMessageHandler msgHnd,
+ ClientConnectionStateHandler stateHnd)
+ throws IgniteClientConnectionException {
+
+ // TODO: Make this method async.
+ ChannelFuture f = bootstrap.connect(addr).syncUninterruptibly();
+
+ return new NettyClientConnection(f.channel(), msgHnd, stateHnd);
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientMessageHandler.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientMessageHandler.java
new file mode 100644
index 0000000..670703c
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientMessageHandler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.io.netty;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import static org.apache.ignite.internal.client.io.netty.NettyClientConnection.ATTR_CONN;
+
+/**
+ * Netty client message handler.
+ */
+public class NettyClientMessageHandler extends ChannelInboundHandlerAdapter {
+ /** {@inheritDoc} */
+ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
+ ctx.channel().attr(ATTR_CONN).get().onMessage((ByteBuffer) msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ ctx.channel().attr(ATTR_CONN).get().onDisconnected(null);
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientColumn.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientColumn.java
new file mode 100644
index 0000000..3cc5e7c
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientColumn.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table;
+
+import org.apache.ignite.client.proto.ClientDataType;
+
+/**
+ * Schema column.
+ */
+public class ClientColumn {
+ /** Column name. */
+ private final String name;
+
+ /** Column type code (see {@link ClientDataType}). */
+ private final int type;
+
+ /** Nullable flag. */
+ private final boolean nullable;
+
+ /** Key column flag. */
+ private final boolean isKey;
+
+ /** Index of the column in the schema. */
+ private final int schemaIndex;
+
+ /**
+ * Constructor.
+ *
+ * @param name Column name.
+ * @param type Column type code.
+ * @param nullable Nullable flag.
+ * @param isKey Key column flag.
+ * @param schemaIndex Index of the column in the schema.
+ */
+ public ClientColumn(String name, int type, boolean nullable, boolean isKey, int schemaIndex) {
+ assert name != null;
+ assert schemaIndex >= 0;
+
+ this.name = name;
+ this.type = type;
+ this.nullable = nullable;
+ this.isKey = isKey;
+ this.schemaIndex = schemaIndex;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Client data type, see {@link ClientDataType}.
+ * @return Data type code.
+ */
+ public int type() {
+ return type;
+ }
+
+ /**
+ * Gets a value indicating whether this column is nullable.
+ *
+ * @return Value indicating whether this column is nullable.
+ */
+ public boolean nullable() {
+ return nullable;
+ }
+
+ /**
+ * Gets a value indicating whether this column is a part of key.
+ *
+ * @return Value indicating whether this column is a part of key.
+ */
+ public boolean key() {
+ return isKey;
+ }
+
+ /**
+ * Gets the index of the column in the schema.
+ *
+ * @return Schema index.
+ */
+ public int schemaIndex() {
+ return schemaIndex;
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
new file mode 100644
index 0000000..d3b3a8c
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientSchema.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Client schema.
+ */
+public class ClientSchema {
+ /** Schema version. Incremented on each schema modification. */
+ private final int ver;
+
+ /** Key columns count. */
+ private final int keyColumnCount;
+
+ /** Columns. */
+ private final ClientColumn[] columns;
+
+ /** Columns map by name. */
+ private final Map<String, ClientColumn> map = new HashMap<>();
+
+ /**
+ * Constructor.
+ *
+ * @param ver Schema version.
+ * @param columns Columns.
+ */
+ public ClientSchema(int ver, ClientColumn[] columns) {
+ assert ver >= 0;
+ assert columns != null;
+
+ this.ver = ver;
+ this.columns = columns;
+
+ var keyCnt = 0;
+
+ for (var col : columns) {
+ if (col.key())
+ keyCnt++;
+
+ map.put(col.name(), col);
+ }
+
+ keyColumnCount = keyCnt;
+ }
+
+ /**
+ * @return Version.
+ */
+ public int version() {
+ return ver;
+ }
+
+ /**
+ * @return Columns.
+ */
+ public @NotNull ClientColumn[] columns() {
+ return columns;
+ }
+
+ /**
+ * Gets a column by name.
+ *
+ * @param name Column name.
+ * @return Column by name.
+ */
+ public @NotNull ClientColumn column(String name) {
+ var column = map.get(name);
+
+ if (column == null)
+ throw new IgniteException("Column is not present in schema: " + name);
+
+ return column;
+ }
+
+ /**
+ * @return Key column count.
+ */
+ public int keyColumnCount() {
+ return keyColumnCount;
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
new file mode 100644
index 0000000..c766206
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ignite.client.IgniteClientException;
+import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.client.proto.ClientOp;
+import org.apache.ignite.internal.client.PayloadInputChannel;
+import org.apache.ignite.internal.client.PayloadOutputChannel;
+import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.table.InvokeProcessor;
+import org.apache.ignite.table.KeyValueBinaryView;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.TupleBuilder;
+import org.apache.ignite.table.mapper.KeyMapper;
+import org.apache.ignite.table.mapper.RecordMapper;
+import org.apache.ignite.table.mapper.ValueMapper;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.msgpack.core.MessageFormat;
+
+/**
+ * Client table API implementation.
+ */
+public class ClientTable implements Table {
+ /** */
+ private final UUID id;
+
+ /** */
+ private final String name;
+
+ /** */
+ private final ReliableChannel ch;
+
+ /** */
+ private final ConcurrentHashMap<Integer, ClientSchema> schemas = new ConcurrentHashMap<>();
+
+ /** */
+ private volatile int latestSchemaVer = -1;
+
+ /** */
+ private final Object latestSchemaLock = new Object();
+
+ /**
+ * Constructor.
+ *
+ * @param ch Channel.
+ * @param id Table id.
+ * @param name Table name.
+ */
+ public ClientTable(ReliableChannel ch, UUID id, String name) {
+ assert ch != null;
+ assert id != null;
+ assert name != null && !name.isEmpty();
+
+ this.ch = ch;
+ this.id = id;
+ this.name = name;
+ }
+
+ /**
+ * Gets the table id.
+ *
+ * @return Table id.
+ */
+ public UUID tableId() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull String tableName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> KeyValueView<K, V> kvView(KeyMapper<K> keyMapper, ValueMapper<V> valMapper) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyValueBinaryView kvView() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table withTransaction(Transaction tx) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public TupleBuilder tupleBuilder() {
+ return new ClientTupleBuilder();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Tuple get(@NotNull Tuple keyRec) {
+ return getAsync(keyRec).join();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Tuple> getAsync(@NotNull Tuple keyRec) {
+ return getLatestSchema().thenCompose(schema ->
+ ch.serviceAsync(ClientOp.TUPLE_GET, w -> writeTuple(keyRec, schema, w, true), r -> {
+ if (r.in().getNextFormat() == MessageFormat.NIL)
+ return null;
+
+ var schemaVer = r.in().unpackInt();
+
+ return new IgniteBiTuple<>(r, schemaVer);
+ })).thenCompose(biTuple -> {
+ if (biTuple == null)
+ return CompletableFuture.completedFuture(null);
+
+ assert biTuple.getKey() != null;
+ assert biTuple.getValue() != null;
+
+ return getSchema(biTuple.getValue()).thenApply(schema -> readTuple(schema, biTuple.getKey()));
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Tuple> getAll(@NotNull Collection<Tuple> keyRecs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(@NotNull Collection<Tuple> keyRecs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void upsert(@NotNull Tuple rec) {
+ upsertAsync(rec).join();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> upsertAsync(@NotNull Tuple rec) {
+ return getLatestSchema().thenCompose(schema -> ch.serviceAsync(ClientOp.TUPLE_UPSERT,
+ w -> writeTuple(rec, schema, w, false), r -> null));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void upsertAll(@NotNull Collection<Tuple> recs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> upsertAllAsync(@NotNull Collection<Tuple> recs) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Tuple getAndUpsert(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Tuple> getAndUpsertAsync(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean insert(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> insertAsync(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Tuple> insertAll(@NotNull Collection<Tuple> recs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> insertAllAsync(@NotNull Collection<Tuple> recs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean replace(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean replace(@NotNull Tuple oldRec, @NotNull Tuple newRec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple oldRec, @NotNull Tuple newRec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Tuple getAndReplace(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(@NotNull Tuple keyRec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> deleteAsync(@NotNull Tuple keyRec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean deleteExact(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Tuple getAndDelete(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Tuple> getAndDeleteAsync(@NotNull Tuple rec) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Tuple> deleteAll(@NotNull Collection<Tuple> recs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllAsync(@NotNull Collection<Tuple> recs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Tuple> deleteAllExact(@NotNull Collection<Tuple> recs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllExactAsync(@NotNull Collection<Tuple> recs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends Serializable> T invoke(@NotNull Tuple keyRec, InvokeProcessor<Tuple, Tuple, T> proc) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(@NotNull Tuple keyRec, InvokeProcessor<Tuple, Tuple, T> proc) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends Serializable> Map<Tuple, T> invokeAll(@NotNull Collection<Tuple> keyRecs, InvokeProcessor<Tuple, Tuple, T> proc) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull <T extends Serializable> CompletableFuture<Map<Tuple, T>> invokeAllAsync(@NotNull Collection<Tuple> keyRecs, InvokeProcessor<Tuple, Tuple, T> proc) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Transaction transaction() {
+ throw new UnsupportedOperationException();
+ }
+
+ private CompletableFuture<ClientSchema> getLatestSchema() {
+ if (latestSchemaVer >= 0)
+ return CompletableFuture.completedFuture(schemas.get(latestSchemaVer));
+
+ return loadSchema(null);
+ }
+
+ private CompletableFuture<ClientSchema> getSchema(int ver) {
+ var schema = schemas.get(ver);
+
+ if (schema != null)
+ return CompletableFuture.completedFuture(schema);
+
+ return loadSchema(ver);
+ }
+
+ private CompletableFuture<ClientSchema> loadSchema(Integer ver) {
+ return ch.serviceAsync(ClientOp.SCHEMAS_GET, w -> {
+ w.out().packUuid(id);
+
+ if (ver == null)
+ w.out().packNil();
+ else {
+ w.out().packArrayHeader(1);
+ w.out().packInt(ver);
+ }
+ }, r -> {
+ int schemaCnt = r.in().unpackMapHeader();
+
+ if (schemaCnt == 0)
+ throw new IgniteClientException("Schema not found: " + ver);
+
+ ClientSchema last = null;
+
+ for (var i = 0; i < schemaCnt; i++)
+ last = readSchema(r.in());
+
+ return last;
+ });
+ }
+
+ private ClientSchema readSchema(ClientMessageUnpacker in) throws IOException {
+ var schemaVer = in.unpackInt();
+ var colCnt = in.unpackArrayHeader();
+
+ var columns = new ClientColumn[colCnt];
+
+ for (int i = 0; i < colCnt; i++) {
+ var propCnt = in.unpackArrayHeader();
+
+ assert propCnt >= 4;
+
+ var name = in.unpackString();
+ var type = in.unpackInt();
+ var isKey = in.unpackBoolean();
+ var isNullable = in.unpackBoolean();
+
+ // Skip unknown extra properties, if any.
+ in.skipValue(propCnt - 4);
+
+ var column = new ClientColumn(name, type, isNullable, isKey, i);
+ columns[i] = column;
+ }
+
+ var schema = new ClientSchema(schemaVer, columns);
+
+ schemas.put(schemaVer, schema);
+
+ synchronized (latestSchemaLock) {
+ if (schemaVer > latestSchemaVer) {
+ latestSchemaVer = schemaVer;
+ }
+ }
+
+ return schema;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return IgniteToStringBuilder.toString(ClientTable.class, this);
+ }
+
+ private void writeTuple(@NotNull Tuple tuple, ClientSchema schema, PayloadOutputChannel w, boolean keyOnly) throws IOException {
+ // TODO: We should accept any Tuple implementation, but this requires extending the Tuple interface
+ // with methods to retrieve column list.
+ var rec = (ClientTupleBuilder) tuple;
+
+ var vals = new Object[keyOnly ? schema.keyColumnCount() : schema.columns().length];
+
+ for (var entry : rec.map().entrySet()) {
+ var col = schema.column(entry.getKey());
+
+ if (keyOnly && !col.key())
+ continue;
+
+ vals[col.schemaIndex()] = entry.getValue();
+ }
+
+ w.out().packUuid(id);
+ w.out().packInt(schema.version());
+
+ for (var val : vals)
+ w.out().packObject(val);
+ }
+
+ private Tuple readTuple(ClientSchema schema, PayloadInputChannel r) {
+ var builder = new ClientTupleBuilder();
+
+ try {
+ for (var col : schema.columns())
+ builder.set(col.name(), r.in().unpackObject(col.type()));
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+
+ return builder;
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
new file mode 100644
index 0000000..e001663
--- /dev/null
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.apache.ignite.client.proto.ClientOp;
+import org.apache.ignite.configuration.schemas.table.TableChange;
+import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.manager.IgniteTables;
+
+/**
+ * Client tables API implementation.
+ */
+public class ClientTables implements IgniteTables {
+ /** */
+ private final ReliableChannel ch;
+
+ /**
+ * Constructor.
+ *
+ * @param ch Channel.
+ */
+ public ClientTables(ReliableChannel ch) {
+ this.ch = ch;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table createTable(String name, Consumer<TableChange> tableInitChange) {
+ return createTableAsync(name, tableInitChange).join();
+ }
+
+ public CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void alterTable(String name, Consumer<TableChange> tableChange) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table getOrCreateTable(String name, Consumer<TableChange> tableInitChange) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dropTable(String name) {
+ dropTableAsync(name).join();
+ }
+
+ public CompletableFuture<Void> dropTableAsync(String name) {
+ return ch.requestAsync(ClientOp.TABLE_DROP, w -> w.out().packString(name));
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Table> tables() {
+ return tablesAsync().join();
+ }
+
+ public CompletableFuture<List<Table>> tablesAsync() {
+ return ch.serviceAsync(ClientOp.TABLES_GET, r -> {
+ var in = r.in();
+ var cnt = in.unpackMapHeader();
+ var res = new ArrayList<Table>(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ res.add(new ClientTable(ch, in.unpackUuid(), in.unpackString()));
+
+ return res;
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table table(String name) {
+ return tableAsync(name).join();
+ }
+
+ public CompletableFuture<Table> tableAsync(String name) {
+ return ch.serviceAsync(ClientOp.TABLE_GET, w -> w.out().packString(name),
+ r -> new ClientTable(ch, r.in().unpackUuid(), name));
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleBuilder.java
similarity index 66%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
copy to modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleBuilder.java
index 8576938..4aa7266 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleBuilder.java
@@ -15,52 +15,36 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.client.table;
import java.util.BitSet;
import java.util.HashMap;
-import java.util.Map;
import java.util.UUID;
-import java.util.Objects;
+
import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.binary.BinaryObjects;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.TupleBuilder;
/**
- * Buildable tuple.
+ * Client tuple builder.
*/
-public class TupleBuilderImpl implements TupleBuilder, Tuple {
+public final class ClientTupleBuilder implements TupleBuilder, Tuple {
/** Columns values. */
- protected Map<String, Object> map;
-
- /** Current schema descriptor. */
- private SchemaDescriptor schemaDesc;
+ private final HashMap<String, Object> map = new HashMap<>();
/**
- * Creates tuple builder.
+ * Gets the underlying map.
*
- * @param schemaDesc Schema descriptor.
+ * @return Underlying map
*/
- public TupleBuilderImpl(SchemaDescriptor schemaDesc) {
- Objects.requireNonNull(schemaDesc);
-
- this.schemaDesc = schemaDesc;
- map = new HashMap<>(schemaDesc.length());
+ public HashMap<String, Object> map() {
+ return map;
}
/** {@inheritDoc} */
- @Override public TupleBuilder set(String colName, Object val) {
- Column col = schema().column(colName);
-
- if (col == null)
- throw new ColumnNotFoundException("Column not found [col=" + colName + "schema=" + schemaDesc + ']');
-
- col.validate(val);
-
- map.put(colName, val);
+ @Override public TupleBuilder set(String colName, Object value) {
+ map.put(colName, value);
return this;
}
@@ -75,15 +59,14 @@ public class TupleBuilderImpl implements TupleBuilder, Tuple {
return (T)map.getOrDefault(colName, def);
}
+ /** {@inheritDoc} */
@Override public <T> T value(String colName) {
return (T)map.get(colName);
}
/** {@inheritDoc} */
@Override public BinaryObject binaryObjectField(String colName) {
- byte[] data = value(colName);
-
- return BinaryObjects.wrap(data);
+ throw new IgniteException("Not supported");
}
/** {@inheritDoc} */
@@ -130,20 +113,4 @@ public class TupleBuilderImpl implements TupleBuilder, Tuple {
@Override public BitSet bitmaskValue(String colName) {
return value(colName);
}
-
- /**
- * Get schema descriptor.
- *
- * @return Schema descriptor.
- */
- public SchemaDescriptor schema() {
- return schemaDesc;
- }
-
- /**
- * @param schemaDesc New current schema descriptor.
- */
- protected void schema(SchemaDescriptor schemaDesc) {
- this.schemaDesc = schemaDesc;
- }
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
new file mode 100644
index 0000000..8177a61
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.util.Collections;
+
+import io.netty.channel.ChannelFuture;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.client.handler.ClientHandlerModule;
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
+import org.apache.ignite.internal.client.table.ClientTupleBuilder;
+import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.helpers.NOPLogger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Base class for client tests.
+ */
+public abstract class AbstractClientTest {
+ protected static final String DEFAULT_TABLE = "default_test_table";
+
+ protected static ChannelFuture serverFuture;
+
+ protected static Ignite server;
+
+ protected static Ignite client;
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ serverFuture = startServer(null);
+ client = startClient();
+ }
+
+ @AfterAll
+ public static void afterAll() throws Exception {
+ client.close();
+ serverFuture.cancel(true);
+ serverFuture.await();
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ for (var t : server.tables().tables())
+ server.tables().dropTable(t.tableName());
+ }
+
+ public static Ignite startClient(String... addrs) {
+ if (addrs == null || addrs.length == 0)
+ addrs = new String[]{"127.0.0.2:10800"};
+
+ var builder = IgniteClient.builder().addresses(addrs);
+
+ return builder.build();
+ }
+
+ public static ChannelFuture startServer(String host) throws InterruptedException {
+ var registry = new ConfigurationRegistry(
+ Collections.singletonList(ClientConnectorConfiguration.KEY),
+ Collections.emptyMap(),
+ Collections.singletonList(new TestConfigurationStorage(ConfigurationType.LOCAL))
+ );
+
+ server = new FakeIgnite();
+
+ var module = new ClientHandlerModule(server, NOPLogger.NOP_LOGGER);
+
+ module.prepareStart(registry);
+
+ return module.start();
+ }
+
+ public static void assertTupleEquals(Tuple x, Tuple y) {
+ if (x == null)
+ assertNull(y);
+
+ if (y == null)
+ assertNull(x);
+
+ var a = (ClientTupleBuilder) x;
+ var b = (ClientTupleBuilder) y;
+
+ assertEquals(a.map().size(), b.map().size());
+
+ for (var kv : a.map().entrySet())
+ assertEquals(kv.getValue(), b.map().get(kv.getKey()), kv.getKey());
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientTableTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientTableTest.java
new file mode 100644
index 0000000..1d4a39a
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientTableTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.util.concurrent.CompletionException;
+
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Table tests.
+ */
+public class ClientTableTest extends AbstractClientTest {
+ @Test
+ public void testGetWithNullInNotNullableKeyColumnThrowsException() {
+ Table table = getDefaultTable();
+
+ var key = table.tupleBuilder().set("foo", "123").build();
+
+ var ex = assertThrows(CompletionException.class, () -> table.get(key));
+
+ assertTrue(ex.getMessage().contains("Column is not present in schema: foo"), ex.getMessage());
+ }
+
+ @Test
+ public void testUpsertGet() {
+ Table table = getDefaultTable();
+
+ var tuple = table.tupleBuilder()
+ .set("id", 123L)
+ .set("name", "John")
+ .build();
+
+ table.upsert(tuple);
+
+ Tuple key = table.tupleBuilder().set("id", 123).build();
+ var resTuple = table.get(key);
+
+ assertEquals("John", resTuple.stringValue("name"));
+ assertEquals(123L, resTuple.longValue("id"));
+ assertTupleEquals(tuple, resTuple);
+ }
+
+ @Test
+ public void testUpsertGetAsync() {
+ Table table = getDefaultTable();
+
+ var tuple = table.tupleBuilder()
+ .set("id", 42L)
+ .set("name", "Jack")
+ .build();
+
+ Tuple key = table.tupleBuilder().set("id", 42).build();
+
+ var resTuple = table.upsertAsync(tuple).thenCompose(t -> table.getAsync(key)).join();
+
+ assertEquals("Jack", resTuple.stringValue("name"));
+ assertEquals(42L, resTuple.longValue("id"));
+ assertTupleEquals(tuple, resTuple);
+ }
+
+ private Table getDefaultTable() {
+ server.tables().getOrCreateTable(DEFAULT_TABLE, tbl -> tbl.changeReplicas(1));
+
+ return client.tables().table(DEFAULT_TABLE);
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientTablesTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientTablesTest.java
new file mode 100644
index 0000000..5815bb7
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientTablesTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.util.Comparator;
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+
+import org.apache.ignite.client.fakes.FakeIgniteTables;
+import org.apache.ignite.configuration.schemas.table.TableChange;
+import org.apache.ignite.internal.client.table.ClientTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests client tables.
+ */
+public class ClientTablesTest extends AbstractClientTest {
+ @Test
+ public void testTablesWhenTablesExist() {
+ server.tables().createTable(DEFAULT_TABLE, null);
+ server.tables().createTable("t", null);
+
+ var tables = client.tables().tables();
+ assertEquals(2, tables.size());
+
+ tables.sort(Comparator.comparing(Table::tableName));
+ assertEquals(DEFAULT_TABLE, tables.get(0).tableName());
+ assertEquals("t", tables.get(1).tableName());
+ }
+
+ @Test
+ public void testTablesWhenNoTablesExist() {
+ var tables = client.tables().tables();
+
+ assertEquals(0, tables.size());
+ }
+
+ @Test
+ @Disabled("IGNITE-15179")
+ public void testCreateTable() {
+ var clientTable = client.tables().createTable("t1", t -> t.changeReplicas(2));
+ assertEquals("t1", clientTable.tableName());
+
+ var serverTables = server.tables().tables();
+ assertEquals(1, serverTables.size());
+
+ var serverTable = serverTables.get(0);
+ assertEquals("t1", serverTable.tableName());
+ assertEquals(((TableImpl) serverTable).tableId(), ((ClientTable) clientTable).tableId());
+ }
+
+ @Test
+ @Disabled("IGNITE-15179")
+ public void testCreateTableWhenExists() {
+ Consumer<TableChange> consumer = t -> t.changeReplicas(2);
+ client.tables().createTable(DEFAULT_TABLE, consumer);
+
+ var ex = assertThrows(CompletionException.class,
+ () -> client.tables().createTable(DEFAULT_TABLE, consumer));
+
+ assertTrue(ex.getMessage().endsWith(FakeIgniteTables.TABLE_EXISTS));
+ assertEquals(IgniteClientException.class, ex.getCause().getClass());
+
+ var serverTables = server.tables().tables();
+ assertEquals(1, serverTables.size());
+ }
+
+ @Test
+ public void testDropTable() {
+ server.tables().createTable("t", t -> t.changeReplicas(0));
+
+ client.tables().dropTable("t");
+
+ assertEquals(0, server.tables().tables().size());
+ }
+
+ @Test
+ public void testDropTableInvalidName() {
+ client.tables().dropTable("foo");
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
new file mode 100644
index 0000000..3c0e34c
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import org.apache.ignite.lang.IgniteException;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests client connection to various addresses.
+ */
+public class ConnectionTest extends AbstractClientTest {
+ @Test
+ public void testEmptyNodeAddress() {
+ var ex = assertThrows(IgniteException.class, () -> testConnection(""));
+ assertEquals("Failed to parse Ignite server address (Address is empty): ", ex.getMessage());
+ }
+
+ @Test
+ public void testNullNodeAddresses() {
+ var ex = assertThrows(IgniteException.class, () -> testConnection(null, null));
+ assertEquals("Failed to parse Ignite server address (Address is empty): null", ex.getMessage());
+ }
+
+ @Test
+ public void testValidNodeAddresses() throws Exception {
+ testConnection("127.0.0.1:10800");
+ }
+
+ @Test
+ public void testInvalidNodeAddresses() throws Exception {
+ var ex = assertThrows(IgniteClientConnectionException.class,
+ () -> testConnection("127.0.0.1:47500"));
+
+ assertEquals("Connection refused: /127.0.0.1:47500", ex.getCause().getMessage());
+ }
+
+ @Test
+ public void testValidInvalidNodeAddressesMix() throws Exception {
+ testConnection("127.0.0.1:47500", "127.0.0.1:10801", "127.0.0.1:10800");
+ }
+
+ @Disabled("IPv6 is not enabled by default on some systems.")
+ @Test
+ public void testIPv6NodeAddresses() throws Exception {
+ testConnection("[::1]:10800");
+ }
+
+ private void testConnection(String... addrs) throws Exception {
+ AbstractClientTest.startClient(addrs).close();
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestConfigurationStorage.java b/modules/client/src/test/java/org/apache/ignite/client/TestConfigurationStorage.java
new file mode 100644
index 0000000..5165848
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestConfigurationStorage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
+import org.apache.ignite.internal.configuration.storage.ConfigurationStorageListener;
+import org.apache.ignite.internal.configuration.storage.Data;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+
+/**
+ * Configuration storage for tests.
+ */
+public class TestConfigurationStorage implements ConfigurationStorage {
+ /** Listeners. */
+ private final Set<ConfigurationStorageListener> listeners = new HashSet<>();
+
+ /** Configuration type. */
+ private final ConfigurationType type;
+
+ /**
+ * @param type Configuration type.
+ */
+ public TestConfigurationStorage(ConfigurationType type) {
+ this.type = type;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Data readAll() throws StorageException {
+ return new Data(Collections.emptyMap(), 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
+ for (ConfigurationStorageListener listener : listeners)
+ listener.onEntriesChanged(new Data(newValues, version + 1));
+
+ return CompletableFuture.completedFuture(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void registerConfigurationListener(ConfigurationStorageListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override public void notifyApplied(long storageRevision) {
+ }
+
+ /** {@inheritDoc} */
+ @Override public ConfigurationType type() {
+ return type;
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
new file mode 100644
index 0000000..4f5504f
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.fakes;
+
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.apache.ignite.tx.IgniteTransactions;
+
+/**
+ * Fake Ignite.
+ */
+public class FakeIgnite implements Ignite {
+ /** */
+ private final IgniteTables tables = new FakeIgniteTables();
+
+ /** {@inheritDoc} */
+ @Override public IgniteTables tables() {
+ return tables;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTransactions transactions() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ // No-op.
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
new file mode 100644
index 0000000..b506eda
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.fakes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+import org.apache.ignite.configuration.schemas.table.TableChange;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Fake tables.
+ */
+public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
+ /** */
+ public static final String TABLE_EXISTS = "Table exists";
+
+ /** */
+ private final ConcurrentHashMap<String, TableImpl> tables = new ConcurrentHashMap<>();
+
+ /** */
+ private final ConcurrentHashMap<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public Table createTable(String name, Consumer<TableChange> tableInitChange) {
+ var newTable = getNewTable(name);
+
+ var oldTable = tables.putIfAbsent(name, newTable);
+
+ if (oldTable != null)
+ throw new IgniteException(TABLE_EXISTS);
+
+ tablesById.put(newTable.tableId(), newTable);
+
+ return newTable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void alterTable(String name, Consumer<TableChange> tableChange) {
+ throw new IgniteException("Not supported");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table getOrCreateTable(String name, Consumer<TableChange> tableInitChange) {
+ var newTable = getNewTable(name);
+
+ var oldTable = tables.putIfAbsent(name, newTable);
+
+ if (oldTable != null)
+ return oldTable;
+
+ tablesById.put(newTable.tableId(), newTable);
+
+ return newTable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dropTable(String name) {
+ var table = tables.remove(name);
+
+ if (table != null)
+ tablesById.remove(table.tableId());
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Table> tables() {
+ return new ArrayList<>(tables.values());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table table(String name) {
+ return tables.get(name);
+ }
+
+ @NotNull private TableImpl getNewTable(String name) {
+ UUID tableId = UUID.randomUUID();
+ return new TableImpl(new FakeInternalTable(name, tableId), getSchemaReg(tableId), null, null);
+ }
+
+ @NotNull private SchemaRegistryImpl getSchemaReg(UUID tableId) {
+ return new SchemaRegistryImpl(1, v -> getSchema(v, tableId));
+ }
+
+ /**
+ * Gets the schema.
+ * @param v Version.
+ * @param tableId id.
+ * @return Schema descriptor.
+ */
+ private SchemaDescriptor getSchema(Integer v, UUID tableId) {
+ if (v != 1)
+ return null;
+
+ return new SchemaDescriptor(
+ tableId,
+ 1,
+ new Column[]{new Column("id", NativeTypes.INT64, false)},
+ new Column[]{new Column("name", NativeTypes.STRING, true)});
+ }
+
+ /** {@inheritDoc} */
+ @Override public TableImpl table(UUID id) {
+ return tablesById.get(id);
+ }
+
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
new file mode 100644
index 0000000..a32eea9
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.fakes;
+
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.schema.SchemaMode;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Fake internal table.
+ */
+public class FakeInternalTable implements InternalTable {
+ private final String tableName;
+
+ private final UUID tableId;
+
+ private final ConcurrentHashMap<ByteBuffer, BinaryRow> data = new ConcurrentHashMap<>();
+
+ public FakeInternalTable(String tableName, UUID tableId) {
+ this.tableName = tableName;
+ this.tableId = tableId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull UUID tableId() {
+ return tableId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull String tableName() {
+ return tableName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull SchemaMode schemaMode() {
+ return SchemaMode.STRICT_SCHEMA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void schema(SchemaMode schemaMode) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<BinaryRow> get(BinaryRow keyRow, @Nullable Transaction tx) {
+ return CompletableFuture.completedFuture(data.get(keyRow.keySlice()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> upsert(BinaryRow row, @Nullable Transaction tx) {
+ data.put(row.keySlice(), row);
+
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<BinaryRow> getAndUpsert(BinaryRow row, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Boolean> insert(BinaryRow row, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRow> rows, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Boolean> replace(BinaryRow row, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Boolean> replace(BinaryRow oldRow, BinaryRow newRow, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<BinaryRow> getAndReplace(BinaryRow row, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Boolean> delete(BinaryRow keyRow, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Boolean> deleteExact(BinaryRow oldRow, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<BinaryRow> getAndDelete(BinaryRow row, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRow> rows, @Nullable Transaction tx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRow> rows, @Nullable Transaction tx) {
+ return null;
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/internal/client/HostAndPortRangeTest.java b/modules/client/src/test/java/org/apache/ignite/internal/client/HostAndPortRangeTest.java
new file mode 100644
index 0000000..6a84d72
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/internal/client/HostAndPortRangeTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import org.apache.ignite.lang.IgniteException;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests HostAndPortRange.
+ */
+public class HostAndPortRangeTest {
+ /**
+ * Tests correct input address with IPv4 host and port range.
+ */
+ @Test
+ public void testParseIPv4WithPortRange() {
+ String addrStr = "127.0.0.1:8080..8090";
+ String errMsgPrefix = "";
+ int dfltPortFrom = 18360;
+ int dfltPortTo = 18362;
+
+ HostAndPortRange actual = HostAndPortRange.parse(addrStr, dfltPortFrom, dfltPortTo, errMsgPrefix);
+ HostAndPortRange expected = new HostAndPortRange("127.0.0.1", 8080, 8090);
+
+ assertEquals(expected, actual);
+ }
+
+ /**
+ * Tests correct input address with IPv4 host and single port.
+ */
+ @Test
+ public void testParseIPv4WithSinglePort() {
+ String addrStr = "127.0.0.1:8080";
+ String errMsgPrefix = "";
+ int dfltPortFrom = 18360;
+ int dfltPortTo = 18362;
+
+ HostAndPortRange actual = HostAndPortRange.parse(addrStr, dfltPortFrom, dfltPortTo, errMsgPrefix);
+ HostAndPortRange expected = new HostAndPortRange("127.0.0.1", 8080, 8080);
+
+ assertEquals(expected, actual);
+ }
+
+ /**
+ * Tests correct input address with IPv4 host and no port.
+ */
+ @Test
+ public void testParseIPv4NoPort() {
+ String addrStr = "127.0.0.1";
+ String errMsgPrefix = "";
+ int dfltPortFrom = 18360;
+ int dfltPortTo = 18362;
+
+ HostAndPortRange actual = HostAndPortRange.parse(addrStr, dfltPortFrom, dfltPortTo, errMsgPrefix);
+ HostAndPortRange expected = new HostAndPortRange("127.0.0.1", 18360, 18362);
+
+ assertEquals(expected, actual);
+ }
+
+ /**
+ * Tests correct input address with IPv6 host and port range.
+ */
+ @Test
+ public void testParseIPv6WithPortRange() {
+ String addrStr = "[::1]:8080..8090";
+ String errMsgPrefix = "";
+ int dfltPortFrom = 18360;
+ int dfltPortTo = 18362;
+
+ HostAndPortRange actual = HostAndPortRange.parse(addrStr, dfltPortFrom, dfltPortTo, errMsgPrefix);
+ HostAndPortRange expected = new HostAndPortRange("::1", 8080, 8090);
+
+ assertEquals(expected, actual);
+ }
+
+ /**
+ * Tests correct input address with IPv6 host and single port.
+ */
+ @Test
+ public void testParseIPv6WithSinglePort() {
+ String addrStr = "[3ffe:2a00:100:7031::]:8080";
+ String errMsgPrefix = "";
+ int dfltPortFrom = 18360;
+ int dfltPortTo = 18362;
+
+ HostAndPortRange actual = HostAndPortRange.parse(addrStr, dfltPortFrom, dfltPortTo, errMsgPrefix);
+ HostAndPortRange expected = new HostAndPortRange("3ffe:2a00:100:7031::", 8080, 8080);
+
+ assertEquals(expected, actual);
+ }
+
+ /**
+ * Tests correct input address with IPv6 host and no port.
+ */
+ @Test
+ public void testParseIPv6NoPort() {
+ String addrStr = "::FFFF:129.144.52.38";
+ String errMsgPrefix = "";
+ int dfltPortFrom = 18360;
+ int dfltPortTo = 18362;
+
+ HostAndPortRange actual = HostAndPortRange.parse(addrStr, dfltPortFrom, dfltPortTo, errMsgPrefix);
+ HostAndPortRange expected = new HostAndPortRange("::FFFF:129.144.52.38", 18360, 18362);
+
+ assertEquals(expected, actual);
+ }
+
+ /**
+ * Tests incorrect input address with IPv6 host (no brackets) and port.
+ */
+ @Test
+ public void testParseIPv6IncorrectHost() {
+ String addrStr = "3ffe:2a00:100:7031";
+ String errMsgPrefix = "";
+ int dfltPortFrom = 18360;
+ int dfltPortTo = 18362;
+
+ var ex = assertThrows(IgniteException.class,
+ () -> HostAndPortRange.parse(addrStr, dfltPortFrom, dfltPortTo, errMsgPrefix));
+
+ assertTrue(ex.getMessage().contains("IPv6 is incorrect"), ex.getMessage());
+ }
+
+ /**
+ * Tests empty host and port.
+ */
+ @Test
+ public void testParseNoHost() {
+ String addrStr = ":8080";
+ String errMsgPrefix = "";
+ int dfltPortFrom = 18360;
+ int dfltPortTo = 18362;
+
+ var ex = assertThrows(IgniteException.class,
+ () -> HostAndPortRange.parse(addrStr, dfltPortFrom, dfltPortTo, errMsgPrefix));
+
+ assertTrue(ex.getMessage().contains("Host name is empty"), ex.getMessage());
+ }
+
+ /**
+ * Tests empty address string.
+ */
+ @Test
+ public void testParseNoAddress() {
+ String addrStr = "";
+ String errMsgPrefix = "";
+ int dfltPortFrom = 18360;
+ int dfltPortTo = 18362;
+
+ var ex = assertThrows(IgniteException.class,
+ () -> HostAndPortRange.parse(addrStr, dfltPortFrom, dfltPortTo, errMsgPrefix));
+
+ assertTrue(ex.getMessage().contains("Address is empty"), ex.getMessage());
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/LiveSchemaChangeKVViewTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/LiveSchemaChangeKVViewTest.java
index d52301d..d891f08 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/LiveSchemaChangeKVViewTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/LiveSchemaChangeKVViewTest.java
@@ -17,12 +17,11 @@
package org.apache.ignite.internal.runner.app;
-import java.util.List;
import org.apache.ignite.app.Ignite;
+import org.apache.ignite.internal.schema.SchemaAware;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.table.ColumnNotFoundException;
import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.TupleBuilderImpl;
import org.apache.ignite.schema.SchemaMode;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.Table;
@@ -31,6 +30,8 @@ import org.apache.ignite.table.TupleBuilder;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import java.util.List;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -212,7 +213,7 @@ class LiveSchemaChangeKVViewTest extends AbstractSchemaChangeTest {
TupleBuilder newVerBuilder = tbl.tupleBuilder();
- SchemaDescriptor schema = ((TupleBuilderImpl)newVerBuilder).schema();
+ SchemaDescriptor schema = ((SchemaAware)newVerBuilder).schema();
assertTrue(schema.columnNames().contains("valStrNew"));
assertTrue(schema.columnNames().contains("valIntNew"));
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/LiveSchemaChangeTableTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/LiveSchemaChangeTableTest.java
index f421e4f..4d0b22d 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/LiveSchemaChangeTableTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/LiveSchemaChangeTableTest.java
@@ -17,20 +17,21 @@
package org.apache.ignite.internal.runner.app;
-import java.util.List;
-import java.util.UUID;
import org.apache.ignite.app.Ignite;
+import org.apache.ignite.internal.schema.SchemaAware;
import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.schema.SchemaMode;
import org.apache.ignite.internal.table.ColumnNotFoundException;
import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.TupleBuilderImpl;
+import org.apache.ignite.schema.SchemaMode;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.TupleBuilder;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import java.util.List;
+import java.util.UUID;
+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -133,7 +134,7 @@ class LiveSchemaChangeTableTest extends AbstractSchemaChangeTest {
TupleBuilder newVerBuilder = tbl.tupleBuilder();
- SchemaDescriptor schema = ((TupleBuilderImpl)newVerBuilder).schema();
+ SchemaDescriptor schema = ((SchemaAware)newVerBuilder).schema();
assertTrue(schema.columnNames().contains("valStrNew"));
assertTrue(schema.columnNames().contains("valIntNew"));
@@ -292,7 +293,7 @@ class LiveSchemaChangeTableTest extends AbstractSchemaChangeTest {
TupleBuilder newVerBuilder = tbl.tupleBuilder();
- SchemaDescriptor schema = ((TupleBuilderImpl)newVerBuilder).schema();
+ SchemaDescriptor schema = ((SchemaAware)newVerBuilder).schema();
assertEquals(2, schema.version());
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaAware.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaAware.java
new file mode 100644
index 0000000..1fc98f3
--- /dev/null
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaAware.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+/**
+ * Schema-aware entity.
+ */
+public interface SchemaAware {
+ /**
+ * Gets the schema descriptor.
+ *
+ * @return Schema descriptor.
+ */
+ SchemaDescriptor schema();
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java b/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
new file mode 100644
index 0000000..2d8131f
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.UUID;
+
+/**
+ * Internal tables facade provides low-level methods for table operations.
+ */
+public interface IgniteTablesInternal {
+ /**
+ * Gets a table by id.
+ *
+ * @param id Table ID.
+ * @return Table or {@code null} when not exists.
+ */
+ TableImpl table(UUID id);
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
index dc9ab74..dc57d67 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
@@ -23,13 +23,14 @@ import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjects;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.SchemaAware;
import org.apache.ignite.table.Tuple;
import org.jetbrains.annotations.NotNull;
/**
* Row to RowChunk adapter.
*/
-public abstract class RowChunkAdapter implements Tuple {
+public abstract class RowChunkAdapter implements Tuple, SchemaAware {
/**
* @param colName Column name.
* @return Column.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
index f04aa71..b189c25 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
import java.util.Objects;
import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaAware;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.table.Tuple;
@@ -30,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
* <p>
* Provides methods to access columns values by column names.
*/
-public class TableRow extends RowChunkAdapter {
+public class TableRow extends RowChunkAdapter implements SchemaAware {
/** Schema. */
private final SchemaDescriptor schema;
@@ -81,8 +82,13 @@ public class TableRow extends RowChunkAdapter {
return row;
}
+ /** {@inheritDoc} */
+ @Override public SchemaDescriptor schema() {
+ return schema;
+ }
+
/** Key column chunk. */
- private class KeyRowChunk extends RowChunkAdapter {
+ private class KeyRowChunk extends RowChunkAdapter implements SchemaAware {
/** {@inheritDoc} */
@Override protected Row row() {
return row;
@@ -99,6 +105,11 @@ public class TableRow extends RowChunkAdapter {
return col;
}
+
+ /** {@inheritDoc} */
+ @Override public SchemaDescriptor schema() {
+ return schema;
+ }
}
/** Value column chunk. */
@@ -119,5 +130,10 @@ public class TableRow extends RowChunkAdapter {
return col;
}
+
+ /** {@inheritDoc} */
+ @Override public SchemaDescriptor schema() {
+ return schema;
+ }
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
index 8576938..5f3fa08 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
@@ -25,6 +25,7 @@ import java.util.Objects;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjects;
import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaAware;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.TupleBuilder;
@@ -32,7 +33,7 @@ import org.apache.ignite.table.TupleBuilder;
/**
* Buildable tuple.
*/
-public class TupleBuilderImpl implements TupleBuilder, Tuple {
+public class TupleBuilderImpl implements TupleBuilder, Tuple, SchemaAware {
/** Columns values. */
protected Map<String, Object> map;
@@ -65,6 +66,23 @@ public class TupleBuilderImpl implements TupleBuilder, Tuple {
return this;
}
+ /**
+ * Sets column value.
+ *
+ * @param col Column.
+ * @param value Value to set.
+ * @return {@code this} for chaining.
+ */
+ public TupleBuilder set(Column col, Object value) {
+ assert col != null;
+
+ col.validate(value);
+
+ map.put(col.name(), value);
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public Tuple build() {
return this;
@@ -75,6 +93,7 @@ public class TupleBuilderImpl implements TupleBuilder, Tuple {
return (T)map.getOrDefault(colName, def);
}
+ /** {@inheritDoc} */
@Override public <T> T value(String colName) {
return (T)map.get(colName);
}
@@ -131,12 +150,8 @@ public class TupleBuilderImpl implements TupleBuilder, Tuple {
return value(colName);
}
- /**
- * Get schema descriptor.
- *
- * @return Schema descriptor.
- */
- public SchemaDescriptor schema() {
+ /** {@inheritDoc} */
+ @Override public SchemaDescriptor schema() {
return schemaDesc;
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 380a269..c84b5e3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.schema.SchemaModificationException;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -78,7 +79,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Table manager.
*/
-public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables {
+public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables, IgniteTablesInternal {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(TableManager.class);
@@ -106,6 +107,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Tables. */
private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
+ /** Tables. */
+ private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+
/**
* Creates a new table manager.
*
@@ -167,6 +171,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
var table = new TableImpl(internalTable, schemaReg, this, null);
tables.put(name, table);
+ tablesById.put(table.tableId(), table);
onEvent(TableEvent.CREATE, new TableEventParameters(table), null);
}
@@ -598,10 +603,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return false;
if (e == null) {
- Table droppedTable = tables.remove(tableName);
+ TableImpl droppedTable = tables.remove(tableName);
assert droppedTable != null;
+ tablesById.remove(droppedTable.tableId());
+
dropTblFut.complete(null);
}
else
@@ -696,7 +703,49 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/**
* Gets a table if it exists or {@code null} if it was not created or was removed before.
*
- * @param name Table name.
+ * @param id Table ID.
+ * false otherwise.
+ * @return A table or {@code null} if table does not exist.
+ */
+ @Override public TableImpl table(UUID id) {
+ var tbl = tablesById.get(id);
+
+ if (tbl != null)
+ return tbl;
+
+ CompletableFuture<TableImpl> getTblFut = new CompletableFuture<>();
+
+ EventListener<TableEventParameters> clo = new EventListener<>() {
+ @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
+ if (!id.equals(parameters.tableId()))
+ return false;
+
+ if (e == null)
+ getTblFut.complete(parameters.table());
+ else
+ getTblFut.completeExceptionally(e);
+
+ return true;
+ }
+
+ @Override public void remove(@NotNull Throwable e) {
+ getTblFut.completeExceptionally(e);
+ }
+ };
+
+ listen(TableEvent.CREATE, clo);
+
+ tbl = tablesById.get(id);
+
+ if (tbl != null && getTblFut.complete(tbl) || getTblFut.complete(null))
+ removeListener(TableEvent.CREATE, clo, null);
+
+ return getTblFut.join();
+ }
+
+ /**
+ * Gets a table if it exists or {@code null} if it was not created or was removed before.
+ *
* @param checkConfiguration True when the method checks a configuration before tries to get a table,
* false otherwise.
* @return A table or {@code null} if table does not exist.
diff --git a/parent/pom.xml b/parent/pom.xml
index 9aeaaa9..c6e3b68 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -85,6 +85,7 @@
<disruptor.version>3.3.7</disruptor.version>
<metrics.version>4.0.2</metrics.version>
<jctools.version>3.3.0</jctools.version>
+ <msgpack.version>0.8.21</msgpack.version>
<!-- Plugins versions -->
<apache.rat.plugin.version>0.13</apache.rat.plugin.version>
@@ -280,6 +281,18 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-client-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-client-handler</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- 3rd party dependencies -->
<dependency>
<groupId>org.jetbrains</groupId>
@@ -431,6 +444,18 @@
<version>${slf4j.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ <version>${msgpack.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>jackson-dataformat-msgpack</artifactId>
+ <version>${msgpack.version}</version>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
diff --git a/pom.xml b/pom.xml
index 88c77be..11cdcd5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,9 @@
<module>modules/calcite</module>
<module>modules/cli</module>
<module>modules/cli-common</module>
+ <module>modules/client</module>
+ <module>modules/client-common</module>
+ <module>modules/client-handler</module>
<module>modules/configuration</module>
<module>modules/configuration-annotation-processor</module>
<module>modules/configuration-api</module>