You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/08/02 12:49:29 UTC

[ignite-3] branch main updated: IGNITE-15162 Thin 3.0: Add buffer pooling (#252)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e37640  IGNITE-15162 Thin 3.0: Add buffer pooling (#252)
5e37640 is described below

commit 5e376402965e27e06154d81b0bfbbe6c05b85741
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Aug 2 15:49:23 2021 +0300

    IGNITE-15162 Thin 3.0: Add buffer pooling (#252)
    
    * Add server-side and client-side buffer pooling using Netty ByteBuf pooled allocator.
      * https://netty.io/wiki/using-as-a-generic-library.html
      * https://netty.io/wiki/reference-counted-objects.html
    * Prepend message length in `ClientMessagePacker` to avoid extra buffer allocation and get rid of `ClientMessageEncoder`.
    * Move request handling to separate classes (one class per message type).
    * Stop `ConfigurationRegistry` after tests.
---
 ...essageEncoder.java => ClientMessageCommon.java} |  30 +-
 .../ignite/client/proto/ClientMessageDecoder.java  |  99 ++---
 .../ignite/client/proto/ClientMessagePacker.java   |  63 +++-
 .../ignite/client/proto/ClientMessageUnpacker.java |  34 +-
 .../client/proto/ClientMessageDecoderTest.java     |  68 ++--
 .../client/proto/ClientMessageEncoderTest.java     |  53 ---
 .../proto/ClientMessagePackerUnpackerTest.java     |  66 +++-
 .../ignite/client/handler/ClientHandlerModule.java |   3 -
 .../handler/ClientInboundMessageHandler.java       | 415 +++++----------------
 .../requests/table/ClientSchemasGetRequest.java    |  76 ++++
 .../handler/requests/table/ClientTableCommon.java  | 274 ++++++++++++++
 .../requests/table/ClientTableDropRequest.java}    |  47 +--
 .../requests/table/ClientTableGetRequest.java}     |  56 +--
 .../requests/table/ClientTablesGetRequest.java}    |  59 ++-
 .../requests/table/ClientTupleGetRequest.java      |  53 +++
 .../requests/table/ClientTupleUpsertRequest.java}  |  48 +--
 .../table/ClientTupleUpsertSchemalessRequest.java} |  48 +--
 .../handler/requests/table/package-info.java}      |  17 +-
 .../handler/ClientHandlerIntegrationTest.java      |  10 +-
 .../internal/client/PayloadInputChannel.java       |   7 +-
 .../internal/client/PayloadOutputChannel.java      |   9 +-
 .../ignite/internal/client/TcpClientChannel.java   |  56 +--
 .../internal/client/io/ClientConnection.java       |  12 +-
 .../internal/client/io/ClientMessageHandler.java   |   4 +-
 .../client/io/netty/NettyClientConnection.java     |  12 +-
 .../io/netty/NettyClientConnectionMultiplexer.java |   3 -
 .../client/io/netty/NettyClientMessageHandler.java |   7 +-
 .../apache/ignite/client/AbstractClientTest.java   |  21 +-
 28 files changed, 873 insertions(+), 777 deletions(-)

diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageEncoder.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageCommon.java
similarity index 53%
rename from modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageEncoder.java
rename to modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageCommon.java
index a2dc56a..ae3411e 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageEncoder.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageCommon.java
@@ -17,31 +17,13 @@
 
 package org.apache.ignite.client.proto;
 
-import java.nio.ByteBuffer;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
-
 /**
- * Encodes client messages:
- * 1. MAGIC for first message.
- * 2. Payload length (varint).
- * 3. Payload (bytes).
+ * Common client message constants.
  */
-public class ClientMessageEncoder extends MessageToByteEncoder<ByteBuffer> {
-    /** Magic encoded flag. */
-    private boolean magicEncoded;
-
-    /** {@inheritDoc} */
-    @Override protected void encode(ChannelHandlerContext ctx, ByteBuffer message, ByteBuf out) {
-        if (!magicEncoded) {
-            out.writeBytes(ClientMessageDecoder.MAGIC_BYTES);
-
-            magicEncoded = true;
-        }
+public class ClientMessageCommon {
+    /** Message header size. */
+    public static final int HEADER_SIZE = 4;
 
-        out.writeInt(message.remaining());
-        out.writeBytes(message);
-    }
+    /** Magic bytes before handshake. */
+    public static final byte[] MAGIC_BYTES = new byte[]{0x49, 0x47, 0x4E, 0x49}; // IGNI
 }
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageDecoder.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageDecoder.java
index df42f40..a149cbb 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageDecoder.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageDecoder.java
@@ -17,48 +17,43 @@
 
 package org.apache.ignite.client.proto;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.List;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.util.CharsetUtil;
 import org.apache.ignite.lang.IgniteException;
 
+import static org.apache.ignite.client.proto.ClientMessageCommon.HEADER_SIZE;
+import static org.apache.ignite.client.proto.ClientMessageCommon.MAGIC_BYTES;
+
 /**
  * Decodes full client messages:
  * 1. MAGIC for first message.
- * 2. Payload length (varint).
- * 3. Payload (bytes).
+ * 2. Payload length (4 bytes).
+ * 3. Payload (N bytes).
  */
-public class ClientMessageDecoder extends ByteToMessageDecoder {
-    /** Magic bytes before handshake. */
-    public static final byte[] MAGIC_BYTES = new byte[]{0x49, 0x47, 0x4E, 0x49}; // IGNI
-
-    /** Data buffer. */
-    private byte[] data = new byte[4]; // TODO: Pooled buffers IGNITE-15162.
-
-    /** Remaining byte count. */
-    private int cnt = -4;
-
-    /** Message size. */
-    private int msgSize = -1;
-
+public class ClientMessageDecoder extends LengthFieldBasedFrameDecoder {
     /** Magic decoded flag. */
     private boolean magicDecoded;
 
     /** Magic decoding failed flag. */
     private boolean magicFailed;
 
+    /**
+     * Constructor.
+     */
+    public ClientMessageDecoder() {
+        // TODO: Configurable maximum frame length IGNITE-15164.
+        super(Integer.MAX_VALUE - HEADER_SIZE, 0, HEADER_SIZE, 0, HEADER_SIZE, true);
+    }
+
     /** {@inheritDoc} */
-    @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
-        if (!readMagic(byteBuf))
-            return;
+    @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+        if (!readMagic(in))
+            return null;
 
-        while (read(byteBuf))
-            list.add(ByteBuffer.wrap(data));
+        return super.decode(ctx, in);
     }
 
     /**
@@ -78,13 +73,10 @@ public class ClientMessageDecoder extends ByteToMessageDecoder {
         if (byteBuf.readableBytes() < MAGIC_BYTES.length)
             return false;
 
-        assert data.length == MAGIC_BYTES.length;
-
-        byteBuf.readBytes(data, 0, MAGIC_BYTES.length);
+        var data = new byte[MAGIC_BYTES.length];
+        byteBuf.readBytes(data);
 
         magicDecoded = true;
-        cnt = -1;
-        msgSize = 0;
 
         if (Arrays.equals(data, MAGIC_BYTES))
             return true;
@@ -94,53 +86,4 @@ public class ClientMessageDecoder extends ByteToMessageDecoder {
         throw new IgniteException("Invalid magic header in thin client connection. " +
                 "Expected 'IGNI', but was '" + new String(data, CharsetUtil.US_ASCII) + "'.");
     }
-
-    /**
-     * Reads the buffer.
-     *
-     * @param buf Buffer.
-     * @return True when a complete message has been received; false otherwise.
-     * @throws IgniteException when message is invalid.
-     */
-    private boolean read(ByteBuf buf) {
-        if (buf.readableBytes() == 0)
-            return false;
-
-        if (cnt < 0) {
-            if (buf.readableBytes() < 4)
-                return false;
-
-            msgSize = buf.readInt();
-
-            assert msgSize >= 0;
-            data = new byte[msgSize];
-            cnt = 0;
-        }
-
-        assert data != null;
-        assert msgSize >= 0;
-
-        int remaining = buf.readableBytes();
-
-        if (remaining > 0) {
-            int missing = msgSize - cnt;
-
-            if (missing > 0) {
-                int len = Math.min(missing, remaining);
-
-                buf.readBytes(data, cnt, len);
-
-                cnt += len;
-            }
-        }
-
-        if (cnt == msgSize) {
-            cnt = -1;
-            msgSize = -1;
-
-            return true;
-        }
-
-        return false;
-    }
 }
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java
index 98fa6ce..9cc79c4 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java
@@ -22,21 +22,57 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.UUID;
-
-import org.msgpack.core.MessageBufferPacker;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import org.apache.ignite.lang.IgniteException;
 import org.msgpack.core.MessagePack;
-import org.msgpack.core.buffer.ArrayBufferOutput;
+import org.msgpack.core.MessagePacker;
+import org.msgpack.core.buffer.OutputStreamBufferOutput;
+
+import static org.apache.ignite.client.proto.ClientMessageCommon.HEADER_SIZE;
 
 /**
- * Ignite-specific MsgPack extension.
+ * Ignite-specific MsgPack extension based on Netty ByteBuf.
+ * <p>
+ * Releases wrapped buffer on {@link #close()} .
  */
-public class ClientMessagePacker extends MessageBufferPacker {
+public class ClientMessagePacker extends MessagePacker {
+    /** Underlying buffer. */
+    private final ByteBuf buf;
+
+    /** Closed flag. */
+    private boolean closed = false;
+
     /**
      * Constructor.
+     *
+     * @param buf Buffer.
      */
-    public ClientMessagePacker() {
-        // TODO: Pooled buffers IGNITE-15162.
-        super(new ArrayBufferOutput(), MessagePack.DEFAULT_PACKER_CONFIG);
+    public ClientMessagePacker(ByteBuf buf) {
+        // Reserve 4 bytes for the message length.
+        super(new OutputStreamBufferOutput(new ByteBufOutputStream(buf.writerIndex(HEADER_SIZE))),
+                MessagePack.DEFAULT_PACKER_CONFIG);
+
+        this.buf = buf;
+    }
+
+    /**
+     * Gets the underlying buffer.
+     *
+     * @return Underlying buffer.
+     * @throws IgniteException When flush fails.
+     */
+    public ByteBuf getBuffer() {
+        try {
+            flush();
+        }
+        catch (IOException e) {
+            throw new IgniteException(e);
+        }
+
+        buf.setInt(0, buf.writerIndex() - HEADER_SIZE);
+
+        return buf;
     }
 
     /**
@@ -122,4 +158,15 @@ public class ClientMessagePacker extends MessageBufferPacker {
         // TODO: Support all basic types IGNITE-15163
         throw new IOException("Unsupported type, can't serialize: " + val.getClass());
     }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (closed)
+            return;
+
+        closed = true;
+
+        if (buf.refCnt() > 0)
+            buf.release();
+    }
 }
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java
index e8230e8..6a2c76a 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java
@@ -22,13 +22,14 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.UUID;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
 import org.apache.ignite.lang.IgniteException;
 import org.msgpack.core.MessagePack;
 import org.msgpack.core.MessageSizeException;
 import org.msgpack.core.MessageTypeException;
 import org.msgpack.core.MessageUnpacker;
-import org.msgpack.core.buffer.MessageBufferInput;
+import org.msgpack.core.buffer.InputStreamBufferInput;
 
 import static org.apache.ignite.client.proto.ClientDataType.BITMASK;
 import static org.apache.ignite.client.proto.ClientDataType.BYTES;
@@ -42,16 +43,26 @@ import static org.apache.ignite.client.proto.ClientDataType.INT8;
 import static org.apache.ignite.client.proto.ClientDataType.STRING;
 
 /**
- * Ignite-specific MsgPack extension.
+ * Ignite-specific MsgPack extension based on Netty ByteBuf.
+ * <p>
+ * Releases wrapped buffer on {@link #close()} .
  */
 public class ClientMessageUnpacker extends MessageUnpacker {
+    /** Underlying buffer. */
+    private final ByteBuf buf;
+
+    /** Closed flag. */
+    private boolean closed = false;
+
     /**
      * Constructor.
      *
-     * @param in Input.
+     * @param buf Input.
      */
-    public ClientMessageUnpacker(MessageBufferInput in) {
-        super(in, MessagePack.DEFAULT_UNPACKER_CONFIG);
+    public ClientMessageUnpacker(ByteBuf buf) {
+        super(new InputStreamBufferInput(new ByteBufInputStream(buf)), MessagePack.DEFAULT_UNPACKER_CONFIG);
+
+        this.buf = buf;
     }
 
     /**
@@ -152,4 +163,15 @@ public class ClientMessageUnpacker extends MessageUnpacker {
 
         throw new IgniteException("Unknown client data type: " + dataType);
     }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (closed)
+            return;
+
+        closed = true;
+
+        if (buf.refCnt() > 0)
+            buf.release();
+    }
 }
diff --git a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageDecoderTest.java b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageDecoderTest.java
index 9276102..f2497a6 100644
--- a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageDecoderTest.java
+++ b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageDecoderTest.java
@@ -17,16 +17,14 @@
 
 package org.apache.ignite.client.proto;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import org.apache.ignite.client.proto.ClientMessageDecoder;
 import org.apache.ignite.lang.IgniteException;
 import org.junit.jupiter.api.Test;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
@@ -35,67 +33,33 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 public class ClientMessageDecoderTest {
     @Test
     void testEmptyBufferReturnsNoResults() throws Exception {
-        var buf = new byte[0];
-        var res = new ArrayList<>();
-
-        new ClientMessageDecoder().decode(null, Unpooled.wrappedBuffer(buf), res);
+        var res = decode(new byte[0]);
 
-        assertEquals(0, res.size());
+        assertNull(res);
     }
 
     @Test
-    void testValidMagicAndMessageReturnsPayload() {
-        var res = new ArrayList<>();
-        new ClientMessageDecoder().decode(null, Unpooled.wrappedBuffer(getMagicWithPayload()), res);
-
-        assertEquals(1, res.size());
+    void testValidMagicAndMessageReturnsPayload() throws Exception {
+        byte[] bytes = decode(getMagicWithPayload());
 
-        var resBuf = (ByteBuffer)res.get(0);
-        assertArrayEquals(new byte[]{33, 44}, resBuf.array());
+        assertArrayEquals(new byte[]{33, 44}, bytes);
     }
 
     @Test
     void testInvalidMagicThrowsException() {
         byte[] buf = {66, 69, 69, 70, 1, 2, 3};
 
-        var t = assertThrows(IgniteException.class,
-                () -> new ClientMessageDecoder().decode(null, Unpooled.wrappedBuffer(buf), new ArrayList<>()));
+        var t = assertThrows(IgniteException.class, () -> decode(buf));
 
         assertEquals("Invalid magic header in thin client connection. Expected 'IGNI', but was 'BEEF'.",
                 t.getMessage());
     }
 
-    /**
-     * Tests multipart buffer arrival: socket can split incoming stream into arbitrary chunks.
-     */
-    @Test
-    void testMultipartValidMagicAndMessageReturnsPayload() throws Exception {
-        var decoder = new ClientMessageDecoder();
-        var res = new ArrayList<>();
-
-        byte[] data = getMagicWithPayload();
-
-        decoder.decode(null, Unpooled.wrappedBuffer(data, 0, 4), res);
-        assertEquals(0, res.size());
-
-        decoder.decode(null, Unpooled.wrappedBuffer(data, 4, 4), res);
-        assertEquals(0, res.size());
-
-        decoder.decode(null, Unpooled.wrappedBuffer(data, 8, 1), res);
-        assertEquals(0, res.size());
-
-        decoder.decode(null, Unpooled.wrappedBuffer(data, 9, 1), res);
-        assertEquals(1, res.size());
-
-        var resBuf = (ByteBuffer) res.get(0);
-        assertArrayEquals(new byte[]{33, 44}, resBuf.array());
-    }
-
-    private byte[] getMagicWithPayload() {
+    private static byte[] getMagicWithPayload() {
         var buf = new byte[10];
 
         // Magic.
-        System.arraycopy(ClientMessageDecoder.MAGIC_BYTES, 0, buf, 0, 4);
+        System.arraycopy(ClientMessageCommon.MAGIC_BYTES, 0, buf, 0, 4);
 
         // Message size.
         buf[7] = 2;
@@ -106,4 +70,16 @@ public class ClientMessageDecoderTest {
 
         return buf;
     }
+
+    private static byte[] decode(byte[] request) throws Exception {
+        var resBuf = (ByteBuf)new ClientMessageDecoder().decode(null, Unpooled.wrappedBuffer(request));
+
+        if (resBuf == null)
+            return null;
+
+        var bytes = new byte[resBuf.readableBytes()];
+        resBuf.readBytes(bytes);
+
+        return bytes;
+    }
 }
diff --git a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageEncoderTest.java b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageEncoderTest.java
deleted file mode 100644
index f07bb1f..0000000
--- a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageEncoderTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.proto;
-
-import io.netty.buffer.Unpooled;
-import org.apache.ignite.client.proto.ClientMessageEncoder;
-import org.junit.jupiter.api.Test;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-
-/**
- * Message encoding tests.
- */
-public class ClientMessageEncoderTest {
-    @Test
-    public void testEncodeIncludesMagicWithFirstMessage() {
-        var encoder = new ClientMessageEncoder();
-
-        byte[] res = encode(new byte[]{1, 2}, encoder);
-        assertArrayEquals(new byte[]{0x49, 0x47, 0x4E, 0x49, 0, 0, 0, 2, 1, 2}, res);
-
-        byte[] res2 = encode(new byte[]{7, 8, 9}, encoder);
-        assertArrayEquals(new byte[]{0, 0, 0, 3, 7, 8, 9}, res2);
-
-        byte[] res3 = encode(new byte[0], encoder);
-        assertArrayEquals(new byte[]{0, 0, 0, 0}, res3);
-    }
-
-    private byte[] encode(byte[] array, ClientMessageEncoder encoder) {
-        var target = Unpooled.buffer(100);
-        encoder.encode(null, ByteBuffer.wrap(array), target);
-
-        return Arrays.copyOf(target.array(), target.writerIndex());
-    }
-}
diff --git a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
index fefb14d..3d04434 100644
--- a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
+++ b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
@@ -17,13 +17,11 @@
 
 package org.apache.ignite.client.proto;
 
-import org.apache.ignite.client.proto.ClientMessagePacker;
-import org.apache.ignite.client.proto.ClientMessageUnpacker;
-import org.junit.jupiter.api.Test;
-import org.msgpack.core.buffer.ArrayBufferInput;
-
 import java.io.IOException;
 import java.util.UUID;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -32,19 +30,65 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  */
 public class ClientMessagePackerUnpackerTest {
     @Test
+    public void testPackerCloseReleasesPooledBuffer() {
+        var buf = PooledByteBufAllocator.DEFAULT.directBuffer();
+        var packer = new ClientMessagePacker(buf);
+
+        assertEquals(1, buf.refCnt());
+
+        packer.close();
+
+        assertEquals(0, buf.refCnt());
+    }
+
+    @Test
+    public void testPackerIncludesFourByteMessageLength() throws IOException {
+        try (var packer = new ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer())) {
+            packer.packInt(1); // 1 byte
+            packer.packString("Foo"); // 4 bytes
+
+            var buf = packer.getBuffer();
+            var len = buf.readInt();
+
+            assertEquals(5, len);
+            assertEquals(9, buf.writerIndex());
+            assertEquals(Integer.MAX_VALUE, buf.maxCapacity());
+        }
+    }
+
+    @Test
+    public void testEmptyPackerReturnsFourZeroBytes() {
+        try (var packer = new ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer())) {
+            var buf = packer.getBuffer();
+            var len = buf.readInt();
+
+            assertEquals(0, len);
+            assertEquals(4, buf.writerIndex());
+        }
+    }
+
+    @Test
     public void testUUID() throws IOException {
         testUUID(UUID.randomUUID());
         testUUID(new UUID(0, 0));
     }
 
     private void testUUID(UUID u) throws IOException {
-        var packer = new ClientMessagePacker();
-        packer.packUuid(u);
-        byte[] data = packer.toByteArray();
+        try (var packer = new ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer())) {
+            packer.packUuid(u);
+
+            var buf = packer.getBuffer();
+            var len = buf.readInt();
+
+            byte[] data = new byte[buf.readableBytes()];
+            buf.readBytes(data);
 
-        var unpacker = new ClientMessageUnpacker(new ArrayBufferInput(data));
-        var res = unpacker.unpackUuid();
+            try (var unpacker = new ClientMessageUnpacker(Unpooled.wrappedBuffer(data))) {
+                var res = unpacker.unpackUuid();
 
-        assertEquals(u, res);
+                assertEquals(18, len); // 1 ext + 1 ext type + 16 UUID data
+                assertEquals(u, res);
+            }
+        }
     }
 }
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 18791f8..59ff857 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.client.handler;
 
 import java.net.BindException;
-
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -30,7 +29,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.ignite.app.Ignite;
 import org.apache.ignite.client.proto.ClientMessageDecoder;
-import org.apache.ignite.client.proto.ClientMessageEncoder;
 import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import org.apache.ignite.lang.IgniteException;
@@ -109,7 +107,6 @@ public class ClientHandlerModule {
                 protected void initChannel(Channel ch) {
                     ch.pipeline().addLast(
                             new ClientMessageDecoder(),
-                            new ClientMessageEncoder(),
                             new ClientInboundMessageHandler(ignite, log));
                 }
             })
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 1510691..b5436b2 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -18,35 +18,29 @@
 package org.apache.ignite.client.handler;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import java.util.BitSet;
-import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.ignite.app.Ignite;
-import org.apache.ignite.client.proto.ClientDataType;
+import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTableDropRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTablesGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertSchemalessRequest;
 import org.apache.ignite.client.proto.ClientErrorCode;
+import org.apache.ignite.client.proto.ClientMessageCommon;
 import org.apache.ignite.client.proto.ClientMessagePacker;
 import org.apache.ignite.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.client.proto.ClientOp;
 import org.apache.ignite.client.proto.ProtocolVersion;
 import org.apache.ignite.client.proto.ServerMessageType;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.NativeTypeSpec;
-import org.apache.ignite.internal.schema.SchemaAware;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.table.IgniteTablesInternal;
-import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.table.Table;
-import org.apache.ignite.table.Tuple;
-import org.apache.ignite.table.TupleBuilder;
-import org.msgpack.core.MessageFormat;
-import org.msgpack.core.buffer.ByteBufferInput;
 import org.slf4j.Logger;
 
 /**
@@ -78,21 +72,22 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
-        var buf = (ByteBuffer) msg;
-
-        var unpacker = getUnpacker(buf);
-        var packer = getPacker();
-
-        if (clientContext == null)
-            handshake(ctx, unpacker, packer);
-        else
-            processOperation(ctx, unpacker, packer);
+    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        // Each inbound handler in a pipeline has to release the received messages.
+        try (var unpacker = getUnpacker((ByteBuf) msg)) {
+            // Packer buffer is released by Netty on send, or by inner exception handlers below.
+            var packer = getPacker(ctx.alloc());
+
+            if (clientContext == null)
+                handshake(ctx, unpacker, packer);
+            else
+                processOperation(ctx, unpacker, packer);
+        }
     }
 
-    private void handshake(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer)
-            throws IOException {
+    private void handshake(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer) {
         try {
+            writeMagic(ctx);
             var clientVer = ProtocolVersion.unpack(unpacker);
 
             if (!clientVer.equals(ProtocolVersion.LATEST_VER))
@@ -120,28 +115,42 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
             write(packer, ctx);
         }
         catch (Throwable t) {
-            packer = getPacker();
+            packer.close();
 
-            ProtocolVersion.LATEST_VER.pack(packer);
-            packer.packInt(ClientErrorCode.FAILED).packString(t.getMessage());
+            var errPacker = getPacker(ctx.alloc());
 
-            write(packer, ctx);
+            try {
+                ProtocolVersion.LATEST_VER.pack(errPacker);
+                errPacker.packInt(ClientErrorCode.FAILED).packString(t.getMessage());
+
+                write(errPacker, ctx);
+            }
+            catch (Throwable t2) {
+                errPacker.close();
+                exceptionCaught(ctx, t2);
+            }
         }
     }
 
+    private void writeMagic(ChannelHandlerContext ctx) {
+        ctx.write(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
+    }
+
     private void write(ClientMessagePacker packer, ChannelHandlerContext ctx) {
-        var buf = packer.toMessageBuffer().sliceAsByteBuffer();
+        var buf = packer.getBuffer();
 
+        // writeAndFlush releases pooled buffer.
         ctx.writeAndFlush(buf);
     }
 
-    private void writeError(int requestId, Throwable err, ChannelHandlerContext ctx) {
+    private void writeError(long requestId, Throwable err, ChannelHandlerContext ctx) {
+        var packer = getPacker(ctx.alloc());
+
         try {
             assert err != null;
 
-            ClientMessagePacker packer = getPacker();
             packer.packInt(ServerMessageType.RESPONSE);
-            packer.packInt(requestId);
+            packer.packLong(requestId);
             packer.packInt(ClientErrorCode.FAILED);
 
             String msg = err.getMessage();
@@ -154,329 +163,87 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
             write(packer, ctx);
         }
         catch (Throwable t) {
+            packer.close();
             exceptionCaught(ctx, t);
         }
     }
 
-    private ClientMessagePacker getPacker() {
-        return new ClientMessagePacker();
+    private ClientMessagePacker getPacker(ByteBufAllocator alloc) {
+        // Outgoing messages are released on write.
+        return new ClientMessagePacker(alloc.buffer());
     }
 
-    private ClientMessageUnpacker getUnpacker(ByteBuffer buf) {
-        return new ClientMessageUnpacker(new ByteBufferInput(buf));
+    private ClientMessageUnpacker getUnpacker(ByteBuf buf) {
+        return new ClientMessageUnpacker(buf);
     }
 
-    private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer) throws IOException {
-        var opCode = unpacker.unpackInt();
-        var requestId = unpacker.unpackInt();
-
-        packer.packInt(ServerMessageType.RESPONSE)
-                .packInt(requestId)
-                .packInt(ClientErrorCode.SUCCESS);
+    private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker in, ClientMessagePacker out) {
+        long requestId = -1;
 
         try {
-            var fut = processOperation(unpacker, packer, opCode);
+            var opCode = in.unpackInt();
+            requestId = in.unpackLong();
+
+            out.packInt(ServerMessageType.RESPONSE)
+                    .packLong(requestId)
+                    .packInt(ClientErrorCode.SUCCESS);
+
+            var fut = processOperation(in, out, opCode);
 
             if (fut == null) {
                 // Operation completed synchronously.
-                write(packer, ctx);
+                write(out, ctx);
             }
             else {
+                var reqId = requestId;
+
                 fut.whenComplete((Object res, Object err) -> {
-                    if (err != null)
-                        writeError(requestId, (Throwable) err, ctx);
-                    else
-                        write(packer, ctx);
+                    if (err != null) {
+                        out.close();
+                        writeError(reqId, (Throwable) err, ctx);
+                    } else
+                        write(out, ctx);
                 });
             }
-
         }
         catch (Throwable t) {
+            out.close();
+
             writeError(requestId, t, ctx);
         }
     }
 
-    private CompletableFuture processOperation(ClientMessageUnpacker unpacker, ClientMessagePacker packer, int opCode)
-            throws IOException {
+    private CompletableFuture processOperation(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            int opCode
+    ) throws IOException {
         // TODO: Handle all operations asynchronously (add async table API).
         switch (opCode) {
-            case ClientOp.TABLE_DROP: {
-                var tableName = unpacker.unpackString();
-
-                ignite.tables().dropTable(tableName);
-
-                break;
-            }
-
-            case ClientOp.TABLES_GET: {
-                List<Table> tables = ignite.tables().tables();
-
-                packer.packMapHeader(tables.size());
-
-                for (var table : tables) {
-                    var tableImpl = (TableImpl) table;
-
-                    packer.packUuid(tableImpl.tableId());
-                    packer.packString(table.tableName());
-                }
-
-                break;
-            }
-
-            case ClientOp.SCHEMAS_GET: {
-                var table = readTable(unpacker);
-
-                if (unpacker.getNextFormat() == MessageFormat.NIL) {
-                    // Return the latest schema.
-                    packer.packMapHeader(1);
+            case ClientOp.TABLE_DROP:
+                return ClientTableDropRequest.process(in, ignite.tables());
 
-                    var schema = table.schemaView().schema();
+            case ClientOp.TABLES_GET:
+                return ClientTablesGetRequest.process(out, ignite.tables());
 
-                    if (schema == null)
-                        throw new IgniteException("Schema registry is not initialized.");
+            case ClientOp.SCHEMAS_GET:
+                return ClientSchemasGetRequest.process(in, out, ignite.tables());
 
-                    writeSchema(packer, schema.version(), schema);
-                }
-                else {
-                    var cnt = unpacker.unpackArrayHeader();
-                    packer.packMapHeader(cnt);
+            case ClientOp.TABLE_GET:
+                return ClientTableGetRequest.process(in, out, ignite.tables());
 
-                    for (var i = 0; i < cnt; i++) {
-                        var schemaVer = unpacker.unpackInt();
-                        var schema = table.schemaView().schema(schemaVer);
-                        writeSchema(packer, schemaVer, schema);
-                    }
-                }
+            case ClientOp.TUPLE_UPSERT:
+                return ClientTupleUpsertRequest.process(in, ignite.tables());
 
-                break;
-            }
-
-            case ClientOp.TABLE_GET: {
-                String tableName = unpacker.unpackString();
-                Table table = ignite.tables().table(tableName);
-
-                if (table == null)
-                    packer.packNil();
-                else
-                    packer.packUuid(((TableImpl) table).tableId());
-
-                break;
-            }
-
-            case ClientOp.TUPLE_UPSERT: {
-                var table = readTable(unpacker);
-                var tuple = readTuple(unpacker, table, false);
-
-                return table.upsertAsync(tuple);
-            }
+            case ClientOp.TUPLE_UPSERT_SCHEMALESS:
+                return ClientTupleUpsertSchemalessRequest.process(in, ignite.tables());
 
-            case ClientOp.TUPLE_UPSERT_SCHEMALESS: {
-                var table = readTable(unpacker);
-                var tuple = readTupleSchemaless(unpacker, table);
-
-                return table.upsertAsync(tuple);
-            }
-
-            case ClientOp.TUPLE_GET: {
-                var table = readTable(unpacker);
-                var keyTuple = readTuple(unpacker, table, true);
-
-                return table.getAsync(keyTuple).thenAccept(t -> writeTuple(packer, t));
-            }
+            case ClientOp.TUPLE_GET:
+                return ClientTupleGetRequest.process(in, out, ignite.tables());
 
             default:
                 throw new IgniteException("Unexpected operation code: " + opCode);
         }
-
-        return null;
-    }
-
-    private void writeSchema(ClientMessagePacker packer, int schemaVer, SchemaDescriptor schema) throws IOException {
-        packer.packInt(schemaVer);
-
-        if (schema == null) {
-            packer.packNil();
-
-            return;
-        }
-
-        var colCnt = schema.columnNames().size();
-        packer.packArrayHeader(colCnt);
-
-        for (var colIdx = 0; colIdx < colCnt; colIdx++) {
-            var col = schema.column(colIdx);
-
-            packer.packArrayHeader(4);
-            packer.packString(col.name());
-            packer.packInt(getClientDataType(col.type().spec()));
-            packer.packBoolean(schema.isKeyColumn(colIdx));
-            packer.packBoolean(col.nullable());
-        }
-    }
-
-    private void writeTuple(ClientMessagePacker packer, Tuple tuple) {
-        try {
-            if (tuple == null) {
-                packer.packNil();
-
-                return;
-            }
-
-            var schema = ((SchemaAware) tuple).schema();
-
-            packer.packInt(schema.version());
-
-            for (var col : schema.keyColumns().columns())
-                writeColumnValue(packer, tuple, col);
-
-            for (var col : schema.valueColumns().columns())
-                writeColumnValue(packer, tuple, col);
-        }
-        catch (Throwable t) {
-            throw new IgniteException("Failed to serialize tuple", t);
-        }
-    }
-
-    private Tuple readTuple(ClientMessageUnpacker unpacker, TableImpl table, boolean keyOnly) throws IOException {
-        var schemaId = unpacker.unpackInt();
-        var schema = table.schemaView().schema(schemaId);
-        var builder = table.tupleBuilder();
-
-        var cnt = keyOnly ? schema.keyColumns().length() : schema.length();
-
-        for (int i = 0; i < cnt; i++) {
-            if (unpacker.getNextFormat() == MessageFormat.NIL) {
-                unpacker.skipValue();
-                continue;
-            }
-
-            readAndSetColumnValue(unpacker, builder, schema.column(i));
-        }
-
-        return builder.build();
-    }
-
-    private Tuple readTupleSchemaless(ClientMessageUnpacker unpacker, TableImpl table) throws IOException {
-        var cnt = unpacker.unpackMapHeader();
-        var builder = table.tupleBuilder();
-
-        for (int i = 0; i < cnt; i++) {
-            var colName = unpacker.unpackString();
-
-            builder.set(colName, unpacker.unpackValue());
-        }
-
-        return builder.build();
-    }
-
-    private TableImpl readTable(ClientMessageUnpacker unpacker) throws IOException {
-        var tableId = unpacker.unpackUuid();
-
-        return ((IgniteTablesInternal)ignite.tables()).table(tableId);
-    }
-
-    private void readAndSetColumnValue(ClientMessageUnpacker unpacker, TupleBuilder builder, Column col)
-            throws IOException {
-        builder.set(col.name(), unpacker.unpackObject(getClientDataType(col.type().spec())));
-    }
-
-    private static int getClientDataType(NativeTypeSpec spec) {
-        switch (spec) {
-            case INT8:
-                return ClientDataType.INT8;
-
-            case INT16:
-                return ClientDataType.INT16;
-
-            case INT32:
-                return ClientDataType.INT32;
-
-            case INT64:
-                return ClientDataType.INT64;
-
-            case FLOAT:
-                return ClientDataType.FLOAT;
-
-            case DOUBLE:
-                return ClientDataType.DOUBLE;
-
-            case DECIMAL:
-                return ClientDataType.DECIMAL;
-
-            case UUID:
-                return ClientDataType.UUID;
-
-            case STRING:
-                return ClientDataType.STRING;
-
-            case BYTES:
-                return ClientDataType.BYTES;
-
-            case BITMASK:
-                return ClientDataType.BITMASK;
-        }
-
-        throw new IgniteException("Unsupported native type: " + spec);
-    }
-
-    private void writeColumnValue(ClientMessagePacker packer, Tuple tuple, Column col) throws IOException {
-        var val = tuple.value(col.name());
-
-        if (val == null) {
-            packer.packNil();
-            return;
-        }
-
-        switch (col.type().spec()) {
-            case INT8:
-                packer.packByte((byte) val);
-                break;
-
-            case INT16:
-                packer.packShort((short) val);
-                break;
-
-            case INT32:
-                packer.packInt((int) val);
-                break;
-
-            case INT64:
-                packer.packLong((long) val);
-                break;
-
-            case FLOAT:
-                packer.packFloat((float) val);
-                break;
-
-            case DOUBLE:
-                packer.packDouble((double) val);
-                break;
-
-            case DECIMAL:
-                packer.packDecimal((BigDecimal) val);
-                break;
-
-            case UUID:
-                packer.packUuid((UUID) val);
-                break;
-
-            case STRING:
-                packer.packString((String) val);
-                break;
-
-            case BYTES:
-                byte[] bytes = (byte[]) val;
-                packer.packBinaryHeader(bytes.length);
-                packer.writePayload(bytes);
-                break;
-
-            case BITMASK:
-                packer.packBitSet((BitSet)val);
-                break;
-
-            default:
-                throw new IgniteException("Data type not supported: " + col.type());
-        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientSchemasGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientSchemasGetRequest.java
new file mode 100644
index 0000000..e89e68a
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientSchemasGetRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.proto.ClientMessagePacker;
+import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.msgpack.core.MessageFormat;
+
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeSchema;
+
+/**
+ * Client schemas retrieval request.
+ */
+public class ClientSchemasGetRequest {
+    /**
+     * Processes the request.
+     *
+     * @param in Unpacker.
+     * @param out Packer.
+     * @param tables Ignite tables.
+     * @return Future.
+     * @throws IOException On serialization error.
+     * @throws IgniteException When schema registry is no initialized.
+     */
+    public static CompletableFuture<Object> process(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            IgniteTables tables
+    ) throws IOException {
+        var table = readTable(in, tables);
+
+        if (in.getNextFormat() == MessageFormat.NIL) {
+            // Return the latest schema.
+            out.packMapHeader(1);
+
+            var schema = table.schemaView().schema();
+
+            if (schema == null)
+                throw new IgniteException("Schema registry is not initialized.");
+
+            writeSchema(out, schema.version(), schema);
+        }
+        else {
+            var cnt = in.unpackArrayHeader();
+            out.packMapHeader(cnt);
+
+            for (var i = 0; i < cnt; i++) {
+                var schemaVer = in.unpackInt();
+                var schema = table.schemaView().schema(schemaVer);
+                writeSchema(out, schemaVer, schema);
+            }
+        }
+
+        return null;
+    }
+}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
new file mode 100644
index 0000000..74ef6c2
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.client.proto.ClientDataType;
+import org.apache.ignite.client.proto.ClientMessagePacker;
+import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaAware;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.TupleBuilder;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.msgpack.core.MessageFormat;
+
+/**
+ * Common table functionality.
+ */
+class ClientTableCommon {
+    /**
+     * Writes a schema.
+     *
+     * @param packer Packer.
+     * @param schemaVer Schema version.
+     * @param schema Schema.
+     * @throws IOException When serialization fails.
+     */
+    public static void writeSchema(
+            ClientMessagePacker packer,
+            int schemaVer,
+            SchemaDescriptor schema
+    ) throws IOException {
+        packer.packInt(schemaVer);
+
+        if (schema == null) {
+            packer.packNil();
+
+            return;
+        }
+
+        var colCnt = schema.columnNames().size();
+        packer.packArrayHeader(colCnt);
+
+        for (var colIdx = 0; colIdx < colCnt; colIdx++) {
+            var col = schema.column(colIdx);
+
+            packer.packArrayHeader(4);
+            packer.packString(col.name());
+            packer.packInt(getClientDataType(col.type().spec()));
+            packer.packBoolean(schema.isKeyColumn(colIdx));
+            packer.packBoolean(col.nullable());
+        }
+    }
+
+    /**
+     * Writes a tuple.
+     *
+     * @param packer Packer.
+     * @param tuple Tuple.
+     * @throws IgniteException on failed serialization.
+     */
+    public static void writeTuple(ClientMessagePacker packer, Tuple tuple) {
+        try {
+            if (tuple == null) {
+                packer.packNil();
+
+                return;
+            }
+
+            var schema = ((SchemaAware) tuple).schema();
+
+            packer.packInt(schema.version());
+
+            for (var col : schema.keyColumns().columns())
+                writeColumnValue(packer, tuple, col);
+
+            for (var col : schema.valueColumns().columns())
+                writeColumnValue(packer, tuple, col);
+        }
+        catch (Throwable t) {
+            throw new IgniteException("Failed to serialize tuple", t);
+        }
+    }
+
+    /**
+     * Reads a tuple.
+     *
+     * @param unpacker Unpacker.
+     * @param table Table.
+     * @param keyOnly Whether only key fields are expected.
+     * @return Tuple.
+     * @throws IOException When deserialization fails.
+     */
+    public static Tuple readTuple(ClientMessageUnpacker unpacker, TableImpl table, boolean keyOnly) throws IOException {
+        var schemaId = unpacker.unpackInt();
+        var schema = table.schemaView().schema(schemaId);
+        var builder = table.tupleBuilder();
+
+        var cnt = keyOnly ? schema.keyColumns().length() : schema.length();
+
+        for (int i = 0; i < cnt; i++) {
+            if (unpacker.getNextFormat() == MessageFormat.NIL) {
+                unpacker.skipValue();
+                continue;
+            }
+
+            readAndSetColumnValue(unpacker, builder, schema.column(i));
+        }
+
+        return builder.build();
+    }
+
+    /**
+     * Reads a tuple as a map, without schema.
+     *
+     * @param unpacker Unpacker.
+     * @param table Table.
+     * @return Tuple.
+     * @throws IOException When deserialization fails.
+     */
+    public static Tuple readTupleSchemaless(ClientMessageUnpacker unpacker, TableImpl table) throws IOException {
+        var cnt = unpacker.unpackMapHeader();
+        var builder = table.tupleBuilder();
+
+        for (int i = 0; i < cnt; i++) {
+            var colName = unpacker.unpackString();
+
+            builder.set(colName, unpacker.unpackValue());
+        }
+
+        return builder.build();
+    }
+
+    /**
+     * Reads a table.
+     *
+     * @param unpacker Unpacker.
+     * @param tables Ignite tables.
+     * @return Table.
+     * @throws IOException When deserialization fails.
+     */
+    public static TableImpl readTable(ClientMessageUnpacker unpacker, IgniteTables tables) throws IOException {
+        var tableId = unpacker.unpackUuid();
+
+        return ((IgniteTablesInternal)tables).table(tableId);
+    }
+
+    private static void readAndSetColumnValue(ClientMessageUnpacker unpacker, TupleBuilder builder, Column col)
+            throws IOException {
+        builder.set(col.name(), unpacker.unpackObject(getClientDataType(col.type().spec())));
+    }
+
+    private static int getClientDataType(NativeTypeSpec spec) {
+        switch (spec) {
+            case INT8:
+                return ClientDataType.INT8;
+
+            case INT16:
+                return ClientDataType.INT16;
+
+            case INT32:
+                return ClientDataType.INT32;
+
+            case INT64:
+                return ClientDataType.INT64;
+
+            case FLOAT:
+                return ClientDataType.FLOAT;
+
+            case DOUBLE:
+                return ClientDataType.DOUBLE;
+
+            case DECIMAL:
+                return ClientDataType.DECIMAL;
+
+            case UUID:
+                return ClientDataType.UUID;
+
+            case STRING:
+                return ClientDataType.STRING;
+
+            case BYTES:
+                return ClientDataType.BYTES;
+
+            case BITMASK:
+                return ClientDataType.BITMASK;
+        }
+
+        throw new IgniteException("Unsupported native type: " + spec);
+    }
+
+    private static void writeColumnValue(ClientMessagePacker packer, Tuple tuple, Column col) throws IOException {
+        var val = tuple.value(col.name());
+
+        if (val == null) {
+            packer.packNil();
+            return;
+        }
+
+        switch (col.type().spec()) {
+            case INT8:
+                packer.packByte((byte) val);
+                break;
+
+            case INT16:
+                packer.packShort((short) val);
+                break;
+
+            case INT32:
+                packer.packInt((int) val);
+                break;
+
+            case INT64:
+                packer.packLong((long) val);
+                break;
+
+            case FLOAT:
+                packer.packFloat((float) val);
+                break;
+
+            case DOUBLE:
+                packer.packDouble((double) val);
+                break;
+
+            case DECIMAL:
+                packer.packDecimal((BigDecimal) val);
+                break;
+
+            case UUID:
+                packer.packUuid((UUID) val);
+                break;
+
+            case STRING:
+                packer.packString((String) val);
+                break;
+
+            case BYTES:
+                byte[] bytes = (byte[]) val;
+                packer.packBinaryHeader(bytes.length);
+                packer.writePayload(bytes);
+                break;
+
+            case BITMASK:
+                packer.packBitSet((BitSet)val);
+                break;
+
+            default:
+                throw new IgniteException("Data type not supported: " + col.type());
+        }
+    }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
similarity index 57%
copy from modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
index d5252fe..32b580e 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
@@ -15,46 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client;
+package org.apache.ignite.client.handler.requests.table;
 
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.table.manager.IgniteTables;
 
 /**
- * Thin client payload input channel.
+ * Client table drop request.
  */
-public class PayloadInputChannel {
-    /** Client channel. */
-    private final ClientChannel ch;
-
-    /** Input stream. */
-    private final ClientMessageUnpacker in;
-
+public class ClientTableDropRequest {
     /**
-     * Constructor.
+     * Processes the request.
      *
-     * @param ch Channel.
      * @param in Unpacker.
+     * @param tables Ignite tables.
+     * @return Future.
+     * @throws IOException On serialization error.
      */
-    PayloadInputChannel(ClientChannel ch, ClientMessageUnpacker in) {
-        this.in = in;
-        this.ch = ch;
-    }
+    public static CompletableFuture<Object> process(
+            ClientMessageUnpacker in,
+            IgniteTables tables
+    ) throws IOException {
+        var tableName = in.unpackString();
 
-    /**
-     * Gets client channel.
-     *
-     * @return Client channel.
-     */
-    public ClientChannel clientChannel() {
-        return ch;
-    }
+        tables.dropTable(tableName);
 
-    /**
-     * Gets the unpacker.
-     *
-     * @return Unpacker.
-     */
-    public ClientMessageUnpacker in() {
-        return in;
+        return null;
     }
 }
diff --git a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
similarity index 50%
copy from modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
index fefb14d..5e48025 100644
--- a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
@@ -15,36 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.client.proto;
+package org.apache.ignite.client.handler.requests.table;
 
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.proto.ClientMessagePacker;
 import org.apache.ignite.client.proto.ClientMessageUnpacker;
-import org.junit.jupiter.api.Test;
-import org.msgpack.core.buffer.ArrayBufferInput;
-
-import java.io.IOException;
-import java.util.UUID;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.manager.IgniteTables;
 
 /**
- * Tests Ignite-specific MsgPack extensions.
+ * Client table retrieval request.
  */
-public class ClientMessagePackerUnpackerTest {
-    @Test
-    public void testUUID() throws IOException {
-        testUUID(UUID.randomUUID());
-        testUUID(new UUID(0, 0));
-    }
-
-    private void testUUID(UUID u) throws IOException {
-        var packer = new ClientMessagePacker();
-        packer.packUuid(u);
-        byte[] data = packer.toByteArray();
-
-        var unpacker = new ClientMessageUnpacker(new ArrayBufferInput(data));
-        var res = unpacker.unpackUuid();
-
-        assertEquals(u, res);
+public class ClientTableGetRequest {
+    /**
+     * Processes the request.
+     *
+     * @param in Unpacker.
+     * @param out Packer.
+     * @param tables Ignite tables.
+     * @return Future.
+     * @throws IOException On serialization error.
+     */
+    public static CompletableFuture<Object> process(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            IgniteTables tables
+    ) throws IOException {
+        String tableName = in.unpackString();
+        Table table = tables.table(tableName);
+
+        if (table == null)
+            out.packNil();
+        else
+            out.packUuid(((TableImpl) table).tableId());
+
+        return null;
     }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
similarity index 50%
copy from modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
index 0db8c1c..a05dcd4 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
@@ -15,52 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client;
+package org.apache.ignite.client.handler.requests.table;
 
 import java.io.IOException;
-
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.manager.IgniteTables;
 
 /**
- * Thin client payload output channel.
+ * Client tables retrieval request.
  */
-public class PayloadOutputChannel implements AutoCloseable {
-    /** Client channel. */
-    private final ClientChannel ch;
-
-    /** Output stream. */
-    private final ClientMessagePacker out;
-
+public class ClientTablesGetRequest {
     /**
-     * Constructor.
+     * Processes the request.
      *
-     * @param ch Channel.
+     * @param out Packer.
+     * @param igniteTables Ignite tables.
+     * @return Future.
+     * @throws IOException On serialization error.
      */
-    PayloadOutputChannel(ClientChannel ch) {
-        out = new ClientMessagePacker();
-        this.ch = ch;
-    }
+    public static CompletableFuture<Object> process(
+            ClientMessagePacker out,
+            IgniteTables igniteTables
+    ) throws IOException {
+        var tables = igniteTables.tables();
 
-    /**
-     * Gets client channel.
-     *
-     * @return Client channel.
-     */
-    public ClientChannel clientChannel() {
-        return ch;
-    }
+        out.packMapHeader(tables.size());
 
-    /**
-     * Gets the unpacker.
-     *
-     * @return Unpacker.
-     */
-    public ClientMessagePacker out() {
-        return out;
-    }
+        for (var table : tables) {
+            var tableImpl = (TableImpl) table;
+
+            out.packUuid(tableImpl.tableId());
+            out.packString(table.tableName());
+        }
 
-    /** {@inheritDoc} */
-    @Override public void close() throws IOException {
-        out.close();
+        return null;
     }
 }
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
new file mode 100644
index 0000000..bc3bce0
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.proto.ClientMessagePacker;
+import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.table.manager.IgniteTables;
+
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
+
+/**
+ * Client tuple get request.
+ */
+public class ClientTupleGetRequest {
+    /**
+     * Processes the request.
+     *
+     * @param in Unpacker.
+     * @param out Packer.
+     * @param tables Ignite tables.
+     * @return Future.
+     * @throws IOException On serialization error.
+     */
+    public static CompletableFuture<Void> process(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            IgniteTables tables
+    ) throws IOException {
+        var table = readTable(in, tables);
+        var keyTuple = readTuple(in, table, true);
+
+        return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+    }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
similarity index 52%
copy from modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
index d5252fe..8b98ea7 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
@@ -15,46 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client;
+package org.apache.ignite.client.handler.requests.table;
 
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.table.manager.IgniteTables;
+
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
 
 /**
- * Thin client payload input channel.
+ * Client tuple upsert request.
  */
-public class PayloadInputChannel {
-    /** Client channel. */
-    private final ClientChannel ch;
-
-    /** Input stream. */
-    private final ClientMessageUnpacker in;
-
+public class ClientTupleUpsertRequest {
     /**
-     * Constructor.
+     * Processes the request.
      *
-     * @param ch Channel.
      * @param in Unpacker.
+     * @param tables Ignite tables.
+     * @return Future.
+     * @throws IOException On serialization error.
      */
-    PayloadInputChannel(ClientChannel ch, ClientMessageUnpacker in) {
-        this.in = in;
-        this.ch = ch;
-    }
-
-    /**
-     * Gets client channel.
-     *
-     * @return Client channel.
-     */
-    public ClientChannel clientChannel() {
-        return ch;
-    }
+    public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) throws IOException {
+        var table = readTable(in, tables);
+        var tuple = readTuple(in, table, false);
 
-    /**
-     * Gets the unpacker.
-     *
-     * @return Unpacker.
-     */
-    public ClientMessageUnpacker in() {
-        return in;
+        return table.upsertAsync(tuple);
     }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertSchemalessRequest.java
similarity index 52%
copy from modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertSchemalessRequest.java
index d5252fe..2a0bbcc 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertSchemalessRequest.java
@@ -15,46 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client;
+package org.apache.ignite.client.handler.requests.table;
 
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.table.manager.IgniteTables;
+
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTupleSchemaless;
 
 /**
- * Thin client payload input channel.
+ * Client tuple upsert request.
  */
-public class PayloadInputChannel {
-    /** Client channel. */
-    private final ClientChannel ch;
-
-    /** Input stream. */
-    private final ClientMessageUnpacker in;
-
+public class ClientTupleUpsertSchemalessRequest {
     /**
-     * Constructor.
+     * Processes the request.
      *
-     * @param ch Channel.
      * @param in Unpacker.
+     * @param tables Ignite tables.
+     * @return Future.
+     * @throws IOException On serialization error.
      */
-    PayloadInputChannel(ClientChannel ch, ClientMessageUnpacker in) {
-        this.in = in;
-        this.ch = ch;
-    }
-
-    /**
-     * Gets client channel.
-     *
-     * @return Client channel.
-     */
-    public ClientChannel clientChannel() {
-        return ch;
-    }
+    public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) throws IOException {
+        var table = readTable(in, tables);
+        var tuple = readTupleSchemaless(in, table);
 
-    /**
-     * Gets the unpacker.
-     *
-     * @return Unpacker.
-     */
-    public ClientMessageUnpacker in() {
-        return in;
+        return table.upsertAsync(tuple);
     }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/package-info.java
similarity index 67%
copy from modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/package-info.java
index da437a3..a857f7a 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/package-info.java
@@ -15,20 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.io;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 /**
- * Handles thin client responses and server to client notifications.
+ * Table API requests.
  */
-public interface ClientMessageHandler {
-    /**
-     * Handles messages from the server.
-     *
-     * @param buf Buffer.
-     * @throws IOException on failure.
-     */
-    void onMessage(ByteBuffer buf) throws IOException;
-}
+package org.apache.ignite.client.handler.requests.table;
diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java
index ad482d2..67a8454 100644
--- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.util.Collections;
-
 import io.netty.channel.ChannelFuture;
 import org.apache.ignite.app.Ignite;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
@@ -47,6 +46,8 @@ public class ClientHandlerIntegrationTest {
 
     private ChannelFuture serverFuture;
 
+    private ConfigurationRegistry configurationRegistry;
+
     @BeforeEach
     public void setUp() throws Exception {
         serverFuture = startServer();
@@ -56,6 +57,7 @@ public class ClientHandlerIntegrationTest {
     public void tearDown() throws Exception {
         serverFuture.cancel(true);
         serverFuture.await();
+        configurationRegistry.stop();
     }
 
     @Test
@@ -170,17 +172,17 @@ public class ClientHandlerIntegrationTest {
     }
 
     private ChannelFuture startServer() throws InterruptedException {
-        var registry = new ConfigurationRegistry(
+        configurationRegistry = new ConfigurationRegistry(
                 Collections.singletonList(ClientConnectorConfiguration.KEY),
                 Collections.emptyMap(),
                 Collections.singletonList(new TestConfigurationStorage(ConfigurationType.LOCAL))
         );
 
-        registry.start();
+        configurationRegistry.start();
 
         var module = new ClientHandlerModule(mock(Ignite.class), NOPLogger.NOP_LOGGER);
 
-        module.prepareStart(registry);
+        module.prepareStart(configurationRegistry);
 
         return module.start();
     }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
index d5252fe..c082e26 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
@@ -22,7 +22,7 @@ import org.apache.ignite.client.proto.ClientMessageUnpacker;
 /**
  * Thin client payload input channel.
  */
-public class PayloadInputChannel {
+public class PayloadInputChannel implements AutoCloseable {
     /** Client channel. */
     private final ClientChannel ch;
 
@@ -57,4 +57,9 @@ public class PayloadInputChannel {
     public ClientMessageUnpacker in() {
         return in;
     }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        in.close();
+    }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
index 0db8c1c..bf33bf6 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.client;
 
-import java.io.IOException;
-
 import org.apache.ignite.client.proto.ClientMessagePacker;
 
 /**
@@ -35,10 +33,11 @@ public class PayloadOutputChannel implements AutoCloseable {
      * Constructor.
      *
      * @param ch Channel.
+     * @param out Packer.
      */
-    PayloadOutputChannel(ClientChannel ch) {
-        out = new ClientMessagePacker();
+    PayloadOutputChannel(ClientChannel ch, ClientMessagePacker out) {
         this.ch = ch;
+        this.out = out;
     }
 
     /**
@@ -60,7 +59,7 @@ public class PayloadOutputChannel implements AutoCloseable {
     }
 
     /** {@inheritDoc} */
-    @Override public void close() throws IOException {
+    @Override public void close() {
         out.close();
     }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 50b7ede..cb535fb 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.client;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -30,13 +29,15 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import org.apache.ignite.client.IgniteClientAuthenticationException;
 import org.apache.ignite.client.IgniteClientAuthorizationException;
 import org.apache.ignite.client.IgniteClientConnectionException;
 import org.apache.ignite.client.IgniteClientException;
 import org.apache.ignite.client.proto.ClientErrorCode;
+import org.apache.ignite.client.proto.ClientMessageCommon;
 import org.apache.ignite.client.proto.ClientMessagePacker;
 import org.apache.ignite.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.client.proto.ProtocolVersion;
@@ -47,7 +48,6 @@ import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
 import org.apache.ignite.internal.client.io.ClientMessageHandler;
 import org.apache.ignite.lang.IgniteException;
 import org.jetbrains.annotations.Nullable;
-import org.msgpack.core.buffer.ByteBufferInput;
 
 /**
  * Implements {@link ClientChannel} over TCP.
@@ -106,7 +106,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     }
 
     /** {@inheritDoc} */
-    @Override public void onMessage(ByteBuffer buf) throws IOException {
+    @Override public void onMessage(ByteBuf buf) throws IOException {
         processNextMessage(buf);
     }
 
@@ -155,14 +155,16 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
             throws IgniteClientException {
         long id = reqId.getAndIncrement();
 
-        try (PayloadOutputChannel payloadCh = new PayloadOutputChannel(this)) {
-            if (closed())
-                throw new IgniteClientConnectionException("Channel is closed");
+        if (closed())
+            throw new IgniteClientConnectionException("Channel is closed");
+
+        ClientRequestFuture fut = new ClientRequestFuture();
 
-            ClientRequestFuture fut = new ClientRequestFuture();
+        pendingReqs.put(id, fut);
 
-            pendingReqs.put(id, fut);
+        PayloadOutputChannel payloadCh = new PayloadOutputChannel(this, new ClientMessagePacker(sock.getBuffer()));
 
+        try {
             var req = payloadCh.out();
 
             req.packInt(opCode);
@@ -177,8 +179,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
             });
 
             return fut;
-        }
-        catch (Throwable t) {
+        } catch (Throwable t) {
+            // Close buffer manually on fail. Successful write closes the buffer automatically.
+            payloadCh.close();
             pendingReqs.remove(id);
 
             throw convertException(t);
@@ -197,8 +200,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
             if (payload == null || payloadReader == null)
                 return null;
 
-            try {
-                return payloadReader.apply(new PayloadInputChannel(this, payload));
+            try (var in = new PayloadInputChannel(this, payload)) {
+                return payloadReader.apply(in);
             } catch (Exception e) {
                 throw new IgniteException("Failed to deserialize server response: " + e.getMessage(), e);
             }
@@ -227,8 +230,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     /**
      * Process next message from the input stream and complete corresponding future.
      */
-    private void processNextMessage(ByteBuffer buf) throws IgniteClientException, IOException {
-        var unpacker = new ClientMessageUnpacker(new ByteBufferInput(buf));
+    private void processNextMessage(ByteBuf buf) throws IgniteClientException, IOException {
+        var unpacker = new ClientMessageUnpacker(buf);
 
         if (protocolCtx == null) {
             // Process handshake.
@@ -297,18 +300,19 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
 
     /** Send handshake request. */
     private void handshakeReq(ProtocolVersion proposedVer) throws IOException {
-        try (var req = new ClientMessagePacker()) {
-            req.packInt(proposedVer.major());
-            req.packInt(proposedVer.minor());
-            req.packInt(proposedVer.patch());
+        sock.send(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
 
-            req.packInt(2); // Client type: general purpose.
+        var req = new ClientMessagePacker(sock.getBuffer());
+        req.packInt(proposedVer.major());
+        req.packInt(proposedVer.minor());
+        req.packInt(proposedVer.patch());
 
-            req.packBinaryHeader(0); // Features.
-            req.packMapHeader(0); // Extensions.
+        req.packInt(2); // Client type: general purpose.
 
-            write(req).syncUninterruptibly();
-        }
+        req.packBinaryHeader(0); // Features.
+        req.packMapHeader(0); // Extensions.
+
+        write(req).syncUninterruptibly();
     }
 
     /**
@@ -322,7 +326,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     /** Receive and handle handshake response. */
     private void handshakeRes(ClientMessageUnpacker unpacker, ProtocolVersion proposedVer)
             throws IgniteClientConnectionException, IgniteClientAuthenticationException {
-        try {
+        try (unpacker) {
             ProtocolVersion srvVer = new ProtocolVersion(unpacker.unpackShort(), unpacker.unpackShort(),
                     unpacker.unpackShort());
 
@@ -363,7 +367,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
 
     /** Write bytes to the output stream. */
     private ChannelFuture write(ClientMessagePacker packer) throws IgniteClientConnectionException {
-        var buf = packer.toMessageBuffer().sliceAsByteBuffer();
+        var buf = packer.getBuffer();
 
         return sock.send(buf);
     }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
index 8e25329..67ea66e 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
@@ -17,8 +17,7 @@
 
 package org.apache.ignite.internal.client.io;
 
-import java.nio.ByteBuffer;
-
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import org.apache.ignite.lang.IgniteException;
 
@@ -32,7 +31,14 @@ public interface ClientConnection extends AutoCloseable {
      * @param msg Message buffer.
      * @return Future for the operation.
      */
-    ChannelFuture send(ByteBuffer msg) throws IgniteException;
+    ChannelFuture send(ByteBuf msg) throws IgniteException;
+
+    /**
+     * Gets a buffer to write to.
+     *
+     * @return Buffer.
+     */
+    ByteBuf getBuffer();
 
     /**
      * Closes the connection.
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
index da437a3..dc93455 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.client.io;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
 
 /**
  * Handles thin client responses and server to client notifications.
@@ -30,5 +30,5 @@ public interface ClientMessageHandler {
      * @param buf Buffer.
      * @throws IOException on failure.
      */
-    void onMessage(ByteBuffer buf) throws IOException;
+    void onMessage(ByteBuf buf) throws IOException;
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
index f5955aa..6448460 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
@@ -18,8 +18,8 @@
 package org.apache.ignite.internal.client.io.netty;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.AttributeKey;
@@ -60,11 +60,17 @@ public class NettyClientConnection implements ClientConnection {
     }
 
     /** {@inheritDoc} */
-    @Override public ChannelFuture send(ByteBuffer msg) throws IgniteException {
+    @Override public ChannelFuture send(ByteBuf msg) throws IgniteException {
+        // writeAndFlush releases pooled buffer.
         return channel.writeAndFlush(msg);
     }
 
     /** {@inheritDoc} */
+    @Override public ByteBuf getBuffer() {
+        return channel.alloc().buffer();
+    }
+
+    /** {@inheritDoc} */
     @Override public void close() {
         channel.close();
     }
@@ -75,7 +81,7 @@ public class NettyClientConnection implements ClientConnection {
      * @param buf Message.
      * @throws IOException when message can't be decoded.
      */
-    void onMessage(ByteBuffer buf) throws IOException {
+    void onMessage(ByteBuf buf) throws IOException {
         msgHnd.onMessage(buf);
     }
 
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
index c9cc4c8..b71fbcb 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.client.io.netty;
 
 import java.net.InetSocketAddress;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
@@ -28,7 +27,6 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.ignite.client.IgniteClientConnectionException;
 import org.apache.ignite.client.proto.ClientMessageDecoder;
-import org.apache.ignite.client.proto.ClientMessageEncoder;
 import org.apache.ignite.internal.client.io.ClientConnection;
 import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
 import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
@@ -65,7 +63,6 @@ public class NettyClientConnectionMultiplexer implements ClientConnectionMultipl
                         throws Exception {
                     ch.pipeline().addLast(
                             new ClientMessageDecoder(),
-                            new ClientMessageEncoder(),
                             new NettyClientMessageHandler());
                 }
             });
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientMessageHandler.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientMessageHandler.java
index 670703c..840d560 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientMessageHandler.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientMessageHandler.java
@@ -18,8 +18,7 @@
 package org.apache.ignite.internal.client.io.netty;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 
@@ -31,11 +30,11 @@ import static org.apache.ignite.internal.client.io.netty.NettyClientConnection.A
 public class NettyClientMessageHandler extends ChannelInboundHandlerAdapter {
     /** {@inheritDoc} */
     @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
-        ctx.channel().attr(ATTR_CONN).get().onMessage((ByteBuffer) msg);
+        ctx.channel().attr(ATTR_CONN).get().onMessage((ByteBuf) msg);
     }
 
     /** {@inheritDoc} */
-    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    @Override public void channelInactive(ChannelHandlerContext ctx) {
         ctx.channel().attr(ATTR_CONN).get().onDisconnected(null);
     }
 }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index a226ef6..dbaf32c 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -17,9 +17,10 @@
 
 package org.apache.ignite.client;
 
+import java.net.InetSocketAddress;
 import java.util.Collections;
-
 import io.netty.channel.ChannelFuture;
+import io.netty.util.ResourceLeakDetector;
 import org.apache.ignite.app.Ignite;
 import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.client.handler.ClientHandlerModule;
@@ -42,6 +43,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 public abstract class AbstractClientTest {
     protected static final String DEFAULT_TABLE = "default_test_table";
 
+    protected static ConfigurationRegistry configurationRegistry;
+
     protected static ChannelFuture serverFuture;
 
     protected static Ignite server;
@@ -50,6 +53,8 @@ public abstract class AbstractClientTest {
 
     @BeforeAll
     public static void beforeAll() throws Exception {
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+
         serverFuture = startServer(null);
         client = startClient();
     }
@@ -59,6 +64,7 @@ public abstract class AbstractClientTest {
         client.close();
         serverFuture.cancel(true);
         serverFuture.await();
+        configurationRegistry.stop();
     }
 
     @BeforeEach
@@ -68,8 +74,11 @@ public abstract class AbstractClientTest {
     }
 
     public static Ignite startClient(String... addrs) {
-        if (addrs == null || addrs.length == 0)
-            addrs = new String[]{"127.0.0.2:10800"};
+        if (addrs == null || addrs.length == 0) {
+            var serverPort = ((InetSocketAddress)serverFuture.channel().localAddress()).getPort();
+
+            addrs = new String[]{"127.0.0.2:" + serverPort};
+        }
 
         var builder = IgniteClient.builder().addresses(addrs);
 
@@ -77,19 +86,19 @@ public abstract class AbstractClientTest {
     }
 
     public static ChannelFuture startServer(String host) throws InterruptedException {
-        var registry = new ConfigurationRegistry(
+        configurationRegistry = new ConfigurationRegistry(
                 Collections.singletonList(ClientConnectorConfiguration.KEY),
                 Collections.emptyMap(),
                 Collections.singletonList(new TestConfigurationStorage(ConfigurationType.LOCAL))
         );
 
-        registry.start();
+        configurationRegistry.start();
 
         server = new FakeIgnite();
 
         var module = new ClientHandlerModule(server, NOPLogger.NOP_LOGGER);
 
-        module.prepareStart(registry);
+        module.prepareStart(configurationRegistry);
 
         return module.start();
     }