You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/08/02 12:49:29 UTC
[ignite-3] branch main updated: IGNITE-15162 Thin 3.0: Add buffer
pooling (#252)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5e37640 IGNITE-15162 Thin 3.0: Add buffer pooling (#252)
5e37640 is described below
commit 5e376402965e27e06154d81b0bfbbe6c05b85741
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Aug 2 15:49:23 2021 +0300
IGNITE-15162 Thin 3.0: Add buffer pooling (#252)
* Add server-side and client-side buffer pooling using Netty ByteBuf pooled allocator.
* https://netty.io/wiki/using-as-a-generic-library.html
* https://netty.io/wiki/reference-counted-objects.html
* Prepend message length in `ClientMessagePacker` to avoid extra buffer allocation and get rid of `ClientMessageEncoder`.
* Move request handling to separate classes (one class per message type).
* Stop `ConfigurationRegistry` after tests.
---
...essageEncoder.java => ClientMessageCommon.java} | 30 +-
.../ignite/client/proto/ClientMessageDecoder.java | 99 ++---
.../ignite/client/proto/ClientMessagePacker.java | 63 +++-
.../ignite/client/proto/ClientMessageUnpacker.java | 34 +-
.../client/proto/ClientMessageDecoderTest.java | 68 ++--
.../client/proto/ClientMessageEncoderTest.java | 53 ---
.../proto/ClientMessagePackerUnpackerTest.java | 66 +++-
.../ignite/client/handler/ClientHandlerModule.java | 3 -
.../handler/ClientInboundMessageHandler.java | 415 +++++----------------
.../requests/table/ClientSchemasGetRequest.java | 76 ++++
.../handler/requests/table/ClientTableCommon.java | 274 ++++++++++++++
.../requests/table/ClientTableDropRequest.java} | 47 +--
.../requests/table/ClientTableGetRequest.java} | 56 +--
.../requests/table/ClientTablesGetRequest.java} | 59 ++-
.../requests/table/ClientTupleGetRequest.java | 53 +++
.../requests/table/ClientTupleUpsertRequest.java} | 48 +--
.../table/ClientTupleUpsertSchemalessRequest.java} | 48 +--
.../handler/requests/table/package-info.java} | 17 +-
.../handler/ClientHandlerIntegrationTest.java | 10 +-
.../internal/client/PayloadInputChannel.java | 7 +-
.../internal/client/PayloadOutputChannel.java | 9 +-
.../ignite/internal/client/TcpClientChannel.java | 56 +--
.../internal/client/io/ClientConnection.java | 12 +-
.../internal/client/io/ClientMessageHandler.java | 4 +-
.../client/io/netty/NettyClientConnection.java | 12 +-
.../io/netty/NettyClientConnectionMultiplexer.java | 3 -
.../client/io/netty/NettyClientMessageHandler.java | 7 +-
.../apache/ignite/client/AbstractClientTest.java | 21 +-
28 files changed, 873 insertions(+), 777 deletions(-)
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageEncoder.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageCommon.java
similarity index 53%
rename from modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageEncoder.java
rename to modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageCommon.java
index a2dc56a..ae3411e 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageEncoder.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageCommon.java
@@ -17,31 +17,13 @@
package org.apache.ignite.client.proto;
-import java.nio.ByteBuffer;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
-
/**
- * Encodes client messages:
- * 1. MAGIC for first message.
- * 2. Payload length (varint).
- * 3. Payload (bytes).
+ * Common client message constants.
*/
-public class ClientMessageEncoder extends MessageToByteEncoder<ByteBuffer> {
- /** Magic encoded flag. */
- private boolean magicEncoded;
-
- /** {@inheritDoc} */
- @Override protected void encode(ChannelHandlerContext ctx, ByteBuffer message, ByteBuf out) {
- if (!magicEncoded) {
- out.writeBytes(ClientMessageDecoder.MAGIC_BYTES);
-
- magicEncoded = true;
- }
+public class ClientMessageCommon {
+ /** Message header size. */
+ public static final int HEADER_SIZE = 4;
- out.writeInt(message.remaining());
- out.writeBytes(message);
- }
+ /** Magic bytes before handshake. */
+ public static final byte[] MAGIC_BYTES = new byte[]{0x49, 0x47, 0x4E, 0x49}; // IGNI
}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageDecoder.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageDecoder.java
index df42f40..a149cbb 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageDecoder.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageDecoder.java
@@ -17,48 +17,43 @@
package org.apache.ignite.client.proto;
-import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.List;
-
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.CharsetUtil;
import org.apache.ignite.lang.IgniteException;
+import static org.apache.ignite.client.proto.ClientMessageCommon.HEADER_SIZE;
+import static org.apache.ignite.client.proto.ClientMessageCommon.MAGIC_BYTES;
+
/**
* Decodes full client messages:
* 1. MAGIC for first message.
- * 2. Payload length (varint).
- * 3. Payload (bytes).
+ * 2. Payload length (4 bytes).
+ * 3. Payload (N bytes).
*/
-public class ClientMessageDecoder extends ByteToMessageDecoder {
- /** Magic bytes before handshake. */
- public static final byte[] MAGIC_BYTES = new byte[]{0x49, 0x47, 0x4E, 0x49}; // IGNI
-
- /** Data buffer. */
- private byte[] data = new byte[4]; // TODO: Pooled buffers IGNITE-15162.
-
- /** Remaining byte count. */
- private int cnt = -4;
-
- /** Message size. */
- private int msgSize = -1;
-
+public class ClientMessageDecoder extends LengthFieldBasedFrameDecoder {
/** Magic decoded flag. */
private boolean magicDecoded;
/** Magic decoding failed flag. */
private boolean magicFailed;
+ /**
+ * Constructor.
+ */
+ public ClientMessageDecoder() {
+ // TODO: Configurable maximum frame length IGNITE-15164.
+ super(Integer.MAX_VALUE - HEADER_SIZE, 0, HEADER_SIZE, 0, HEADER_SIZE, true);
+ }
+
/** {@inheritDoc} */
- @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
- if (!readMagic(byteBuf))
- return;
+ @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+ if (!readMagic(in))
+ return null;
- while (read(byteBuf))
- list.add(ByteBuffer.wrap(data));
+ return super.decode(ctx, in);
}
/**
@@ -78,13 +73,10 @@ public class ClientMessageDecoder extends ByteToMessageDecoder {
if (byteBuf.readableBytes() < MAGIC_BYTES.length)
return false;
- assert data.length == MAGIC_BYTES.length;
-
- byteBuf.readBytes(data, 0, MAGIC_BYTES.length);
+ var data = new byte[MAGIC_BYTES.length];
+ byteBuf.readBytes(data);
magicDecoded = true;
- cnt = -1;
- msgSize = 0;
if (Arrays.equals(data, MAGIC_BYTES))
return true;
@@ -94,53 +86,4 @@ public class ClientMessageDecoder extends ByteToMessageDecoder {
throw new IgniteException("Invalid magic header in thin client connection. " +
"Expected 'IGNI', but was '" + new String(data, CharsetUtil.US_ASCII) + "'.");
}
-
- /**
- * Reads the buffer.
- *
- * @param buf Buffer.
- * @return True when a complete message has been received; false otherwise.
- * @throws IgniteException when message is invalid.
- */
- private boolean read(ByteBuf buf) {
- if (buf.readableBytes() == 0)
- return false;
-
- if (cnt < 0) {
- if (buf.readableBytes() < 4)
- return false;
-
- msgSize = buf.readInt();
-
- assert msgSize >= 0;
- data = new byte[msgSize];
- cnt = 0;
- }
-
- assert data != null;
- assert msgSize >= 0;
-
- int remaining = buf.readableBytes();
-
- if (remaining > 0) {
- int missing = msgSize - cnt;
-
- if (missing > 0) {
- int len = Math.min(missing, remaining);
-
- buf.readBytes(data, cnt, len);
-
- cnt += len;
- }
- }
-
- if (cnt == msgSize) {
- cnt = -1;
- msgSize = -1;
-
- return true;
- }
-
- return false;
- }
}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java
index 98fa6ce..9cc79c4 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java
@@ -22,21 +22,57 @@ import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.UUID;
-
-import org.msgpack.core.MessageBufferPacker;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import org.apache.ignite.lang.IgniteException;
import org.msgpack.core.MessagePack;
-import org.msgpack.core.buffer.ArrayBufferOutput;
+import org.msgpack.core.MessagePacker;
+import org.msgpack.core.buffer.OutputStreamBufferOutput;
+
+import static org.apache.ignite.client.proto.ClientMessageCommon.HEADER_SIZE;
/**
- * Ignite-specific MsgPack extension.
+ * Ignite-specific MsgPack extension based on Netty ByteBuf.
+ * <p>
+ * Releases wrapped buffer on {@link #close()} .
*/
-public class ClientMessagePacker extends MessageBufferPacker {
+public class ClientMessagePacker extends MessagePacker {
+ /** Underlying buffer. */
+ private final ByteBuf buf;
+
+ /** Closed flag. */
+ private boolean closed = false;
+
/**
* Constructor.
+ *
+ * @param buf Buffer.
*/
- public ClientMessagePacker() {
- // TODO: Pooled buffers IGNITE-15162.
- super(new ArrayBufferOutput(), MessagePack.DEFAULT_PACKER_CONFIG);
+ public ClientMessagePacker(ByteBuf buf) {
+ // Reserve 4 bytes for the message length.
+ super(new OutputStreamBufferOutput(new ByteBufOutputStream(buf.writerIndex(HEADER_SIZE))),
+ MessagePack.DEFAULT_PACKER_CONFIG);
+
+ this.buf = buf;
+ }
+
+ /**
+ * Gets the underlying buffer.
+ *
+ * @return Underlying buffer.
+ * @throws IgniteException When flush fails.
+ */
+ public ByteBuf getBuffer() {
+ try {
+ flush();
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+
+ buf.setInt(0, buf.writerIndex() - HEADER_SIZE);
+
+ return buf;
}
/**
@@ -122,4 +158,15 @@ public class ClientMessagePacker extends MessageBufferPacker {
// TODO: Support all basic types IGNITE-15163
throw new IOException("Unsupported type, can't serialize: " + val.getClass());
}
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ if (closed)
+ return;
+
+ closed = true;
+
+ if (buf.refCnt() > 0)
+ buf.release();
+ }
}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java
index e8230e8..6a2c76a 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java
@@ -22,13 +22,14 @@ import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.UUID;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
import org.apache.ignite.lang.IgniteException;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageSizeException;
import org.msgpack.core.MessageTypeException;
import org.msgpack.core.MessageUnpacker;
-import org.msgpack.core.buffer.MessageBufferInput;
+import org.msgpack.core.buffer.InputStreamBufferInput;
import static org.apache.ignite.client.proto.ClientDataType.BITMASK;
import static org.apache.ignite.client.proto.ClientDataType.BYTES;
@@ -42,16 +43,26 @@ import static org.apache.ignite.client.proto.ClientDataType.INT8;
import static org.apache.ignite.client.proto.ClientDataType.STRING;
/**
- * Ignite-specific MsgPack extension.
+ * Ignite-specific MsgPack extension based on Netty ByteBuf.
+ * <p>
+ * Releases wrapped buffer on {@link #close()} .
*/
public class ClientMessageUnpacker extends MessageUnpacker {
+ /** Underlying buffer. */
+ private final ByteBuf buf;
+
+ /** Closed flag. */
+ private boolean closed = false;
+
/**
* Constructor.
*
- * @param in Input.
+ * @param buf Input.
*/
- public ClientMessageUnpacker(MessageBufferInput in) {
- super(in, MessagePack.DEFAULT_UNPACKER_CONFIG);
+ public ClientMessageUnpacker(ByteBuf buf) {
+ super(new InputStreamBufferInput(new ByteBufInputStream(buf)), MessagePack.DEFAULT_UNPACKER_CONFIG);
+
+ this.buf = buf;
}
/**
@@ -152,4 +163,15 @@ public class ClientMessageUnpacker extends MessageUnpacker {
throw new IgniteException("Unknown client data type: " + dataType);
}
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ if (closed)
+ return;
+
+ closed = true;
+
+ if (buf.refCnt() > 0)
+ buf.release();
+ }
}
diff --git a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageDecoderTest.java b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageDecoderTest.java
index 9276102..f2497a6 100644
--- a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageDecoderTest.java
+++ b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageDecoderTest.java
@@ -17,16 +17,14 @@
package org.apache.ignite.client.proto;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import org.apache.ignite.client.proto.ClientMessageDecoder;
import org.apache.ignite.lang.IgniteException;
import org.junit.jupiter.api.Test;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
/**
@@ -35,67 +33,33 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class ClientMessageDecoderTest {
@Test
void testEmptyBufferReturnsNoResults() throws Exception {
- var buf = new byte[0];
- var res = new ArrayList<>();
-
- new ClientMessageDecoder().decode(null, Unpooled.wrappedBuffer(buf), res);
+ var res = decode(new byte[0]);
- assertEquals(0, res.size());
+ assertNull(res);
}
@Test
- void testValidMagicAndMessageReturnsPayload() {
- var res = new ArrayList<>();
- new ClientMessageDecoder().decode(null, Unpooled.wrappedBuffer(getMagicWithPayload()), res);
-
- assertEquals(1, res.size());
+ void testValidMagicAndMessageReturnsPayload() throws Exception {
+ byte[] bytes = decode(getMagicWithPayload());
- var resBuf = (ByteBuffer)res.get(0);
- assertArrayEquals(new byte[]{33, 44}, resBuf.array());
+ assertArrayEquals(new byte[]{33, 44}, bytes);
}
@Test
void testInvalidMagicThrowsException() {
byte[] buf = {66, 69, 69, 70, 1, 2, 3};
- var t = assertThrows(IgniteException.class,
- () -> new ClientMessageDecoder().decode(null, Unpooled.wrappedBuffer(buf), new ArrayList<>()));
+ var t = assertThrows(IgniteException.class, () -> decode(buf));
assertEquals("Invalid magic header in thin client connection. Expected 'IGNI', but was 'BEEF'.",
t.getMessage());
}
- /**
- * Tests multipart buffer arrival: socket can split incoming stream into arbitrary chunks.
- */
- @Test
- void testMultipartValidMagicAndMessageReturnsPayload() throws Exception {
- var decoder = new ClientMessageDecoder();
- var res = new ArrayList<>();
-
- byte[] data = getMagicWithPayload();
-
- decoder.decode(null, Unpooled.wrappedBuffer(data, 0, 4), res);
- assertEquals(0, res.size());
-
- decoder.decode(null, Unpooled.wrappedBuffer(data, 4, 4), res);
- assertEquals(0, res.size());
-
- decoder.decode(null, Unpooled.wrappedBuffer(data, 8, 1), res);
- assertEquals(0, res.size());
-
- decoder.decode(null, Unpooled.wrappedBuffer(data, 9, 1), res);
- assertEquals(1, res.size());
-
- var resBuf = (ByteBuffer) res.get(0);
- assertArrayEquals(new byte[]{33, 44}, resBuf.array());
- }
-
- private byte[] getMagicWithPayload() {
+ private static byte[] getMagicWithPayload() {
var buf = new byte[10];
// Magic.
- System.arraycopy(ClientMessageDecoder.MAGIC_BYTES, 0, buf, 0, 4);
+ System.arraycopy(ClientMessageCommon.MAGIC_BYTES, 0, buf, 0, 4);
// Message size.
buf[7] = 2;
@@ -106,4 +70,16 @@ public class ClientMessageDecoderTest {
return buf;
}
+
+ private static byte[] decode(byte[] request) throws Exception {
+ var resBuf = (ByteBuf)new ClientMessageDecoder().decode(null, Unpooled.wrappedBuffer(request));
+
+ if (resBuf == null)
+ return null;
+
+ var bytes = new byte[resBuf.readableBytes()];
+ resBuf.readBytes(bytes);
+
+ return bytes;
+ }
}
diff --git a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageEncoderTest.java b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageEncoderTest.java
deleted file mode 100644
index f07bb1f..0000000
--- a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessageEncoderTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.client.proto;
-
-import io.netty.buffer.Unpooled;
-import org.apache.ignite.client.proto.ClientMessageEncoder;
-import org.junit.jupiter.api.Test;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-
-/**
- * Message encoding tests.
- */
-public class ClientMessageEncoderTest {
- @Test
- public void testEncodeIncludesMagicWithFirstMessage() {
- var encoder = new ClientMessageEncoder();
-
- byte[] res = encode(new byte[]{1, 2}, encoder);
- assertArrayEquals(new byte[]{0x49, 0x47, 0x4E, 0x49, 0, 0, 0, 2, 1, 2}, res);
-
- byte[] res2 = encode(new byte[]{7, 8, 9}, encoder);
- assertArrayEquals(new byte[]{0, 0, 0, 3, 7, 8, 9}, res2);
-
- byte[] res3 = encode(new byte[0], encoder);
- assertArrayEquals(new byte[]{0, 0, 0, 0}, res3);
- }
-
- private byte[] encode(byte[] array, ClientMessageEncoder encoder) {
- var target = Unpooled.buffer(100);
- encoder.encode(null, ByteBuffer.wrap(array), target);
-
- return Arrays.copyOf(target.array(), target.writerIndex());
- }
-}
diff --git a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
index fefb14d..3d04434 100644
--- a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
+++ b/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
@@ -17,13 +17,11 @@
package org.apache.ignite.client.proto;
-import org.apache.ignite.client.proto.ClientMessagePacker;
-import org.apache.ignite.client.proto.ClientMessageUnpacker;
-import org.junit.jupiter.api.Test;
-import org.msgpack.core.buffer.ArrayBufferInput;
-
import java.io.IOException;
import java.util.UUID;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -32,19 +30,65 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
*/
public class ClientMessagePackerUnpackerTest {
@Test
+ public void testPackerCloseReleasesPooledBuffer() {
+ var buf = PooledByteBufAllocator.DEFAULT.directBuffer();
+ var packer = new ClientMessagePacker(buf);
+
+ assertEquals(1, buf.refCnt());
+
+ packer.close();
+
+ assertEquals(0, buf.refCnt());
+ }
+
+ @Test
+ public void testPackerIncludesFourByteMessageLength() throws IOException {
+ try (var packer = new ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer())) {
+ packer.packInt(1); // 1 byte
+ packer.packString("Foo"); // 4 bytes
+
+ var buf = packer.getBuffer();
+ var len = buf.readInt();
+
+ assertEquals(5, len);
+ assertEquals(9, buf.writerIndex());
+ assertEquals(Integer.MAX_VALUE, buf.maxCapacity());
+ }
+ }
+
+ @Test
+ public void testEmptyPackerReturnsFourZeroBytes() {
+ try (var packer = new ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer())) {
+ var buf = packer.getBuffer();
+ var len = buf.readInt();
+
+ assertEquals(0, len);
+ assertEquals(4, buf.writerIndex());
+ }
+ }
+
+ @Test
public void testUUID() throws IOException {
testUUID(UUID.randomUUID());
testUUID(new UUID(0, 0));
}
private void testUUID(UUID u) throws IOException {
- var packer = new ClientMessagePacker();
- packer.packUuid(u);
- byte[] data = packer.toByteArray();
+ try (var packer = new ClientMessagePacker(PooledByteBufAllocator.DEFAULT.directBuffer())) {
+ packer.packUuid(u);
+
+ var buf = packer.getBuffer();
+ var len = buf.readInt();
+
+ byte[] data = new byte[buf.readableBytes()];
+ buf.readBytes(data);
- var unpacker = new ClientMessageUnpacker(new ArrayBufferInput(data));
- var res = unpacker.unpackUuid();
+ try (var unpacker = new ClientMessageUnpacker(Unpooled.wrappedBuffer(data))) {
+ var res = unpacker.unpackUuid();
- assertEquals(u, res);
+ assertEquals(18, len); // 1 ext + 1 ext type + 16 UUID data
+ assertEquals(u, res);
+ }
+ }
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 18791f8..59ff857 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -18,7 +18,6 @@
package org.apache.ignite.client.handler;
import java.net.BindException;
-
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -30,7 +29,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.client.proto.ClientMessageDecoder;
-import org.apache.ignite.client.proto.ClientMessageEncoder;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.lang.IgniteException;
@@ -109,7 +107,6 @@ public class ClientHandlerModule {
protected void initChannel(Channel ch) {
ch.pipeline().addLast(
new ClientMessageDecoder(),
- new ClientMessageEncoder(),
new ClientInboundMessageHandler(ignite, log));
}
})
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 1510691..b5436b2 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -18,35 +18,29 @@
package org.apache.ignite.client.handler;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
import java.util.BitSet;
-import java.util.List;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.ignite.app.Ignite;
-import org.apache.ignite.client.proto.ClientDataType;
+import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTableDropRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTablesGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertSchemalessRequest;
import org.apache.ignite.client.proto.ClientErrorCode;
+import org.apache.ignite.client.proto.ClientMessageCommon;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.client.proto.ClientOp;
import org.apache.ignite.client.proto.ProtocolVersion;
import org.apache.ignite.client.proto.ServerMessageType;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.NativeTypeSpec;
-import org.apache.ignite.internal.schema.SchemaAware;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.table.IgniteTablesInternal;
-import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.table.Table;
-import org.apache.ignite.table.Tuple;
-import org.apache.ignite.table.TupleBuilder;
-import org.msgpack.core.MessageFormat;
-import org.msgpack.core.buffer.ByteBufferInput;
import org.slf4j.Logger;
/**
@@ -78,21 +72,22 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
}
/** {@inheritDoc} */
- @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
- var buf = (ByteBuffer) msg;
-
- var unpacker = getUnpacker(buf);
- var packer = getPacker();
-
- if (clientContext == null)
- handshake(ctx, unpacker, packer);
- else
- processOperation(ctx, unpacker, packer);
+ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ // Each inbound handler in a pipeline has to release the received messages.
+ try (var unpacker = getUnpacker((ByteBuf) msg)) {
+ // Packer buffer is released by Netty on send, or by inner exception handlers below.
+ var packer = getPacker(ctx.alloc());
+
+ if (clientContext == null)
+ handshake(ctx, unpacker, packer);
+ else
+ processOperation(ctx, unpacker, packer);
+ }
}
- private void handshake(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer)
- throws IOException {
+ private void handshake(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer) {
try {
+ writeMagic(ctx);
var clientVer = ProtocolVersion.unpack(unpacker);
if (!clientVer.equals(ProtocolVersion.LATEST_VER))
@@ -120,28 +115,42 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
write(packer, ctx);
}
catch (Throwable t) {
- packer = getPacker();
+ packer.close();
- ProtocolVersion.LATEST_VER.pack(packer);
- packer.packInt(ClientErrorCode.FAILED).packString(t.getMessage());
+ var errPacker = getPacker(ctx.alloc());
- write(packer, ctx);
+ try {
+ ProtocolVersion.LATEST_VER.pack(errPacker);
+ errPacker.packInt(ClientErrorCode.FAILED).packString(t.getMessage());
+
+ write(errPacker, ctx);
+ }
+ catch (Throwable t2) {
+ errPacker.close();
+ exceptionCaught(ctx, t2);
+ }
}
}
+ private void writeMagic(ChannelHandlerContext ctx) {
+ ctx.write(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
+ }
+
private void write(ClientMessagePacker packer, ChannelHandlerContext ctx) {
- var buf = packer.toMessageBuffer().sliceAsByteBuffer();
+ var buf = packer.getBuffer();
+ // writeAndFlush releases pooled buffer.
ctx.writeAndFlush(buf);
}
- private void writeError(int requestId, Throwable err, ChannelHandlerContext ctx) {
+ private void writeError(long requestId, Throwable err, ChannelHandlerContext ctx) {
+ var packer = getPacker(ctx.alloc());
+
try {
assert err != null;
- ClientMessagePacker packer = getPacker();
packer.packInt(ServerMessageType.RESPONSE);
- packer.packInt(requestId);
+ packer.packLong(requestId);
packer.packInt(ClientErrorCode.FAILED);
String msg = err.getMessage();
@@ -154,329 +163,87 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
write(packer, ctx);
}
catch (Throwable t) {
+ packer.close();
exceptionCaught(ctx, t);
}
}
- private ClientMessagePacker getPacker() {
- return new ClientMessagePacker();
+ private ClientMessagePacker getPacker(ByteBufAllocator alloc) {
+ // Outgoing messages are released on write.
+ return new ClientMessagePacker(alloc.buffer());
}
- private ClientMessageUnpacker getUnpacker(ByteBuffer buf) {
- return new ClientMessageUnpacker(new ByteBufferInput(buf));
+ private ClientMessageUnpacker getUnpacker(ByteBuf buf) {
+ return new ClientMessageUnpacker(buf);
}
- private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer) throws IOException {
- var opCode = unpacker.unpackInt();
- var requestId = unpacker.unpackInt();
-
- packer.packInt(ServerMessageType.RESPONSE)
- .packInt(requestId)
- .packInt(ClientErrorCode.SUCCESS);
+ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker in, ClientMessagePacker out) {
+ long requestId = -1;
try {
- var fut = processOperation(unpacker, packer, opCode);
+ var opCode = in.unpackInt();
+ requestId = in.unpackLong();
+
+ out.packInt(ServerMessageType.RESPONSE)
+ .packLong(requestId)
+ .packInt(ClientErrorCode.SUCCESS);
+
+ var fut = processOperation(in, out, opCode);
if (fut == null) {
// Operation completed synchronously.
- write(packer, ctx);
+ write(out, ctx);
}
else {
+ var reqId = requestId;
+
fut.whenComplete((Object res, Object err) -> {
- if (err != null)
- writeError(requestId, (Throwable) err, ctx);
- else
- write(packer, ctx);
+ if (err != null) {
+ out.close();
+ writeError(reqId, (Throwable) err, ctx);
+ } else
+ write(out, ctx);
});
}
-
}
catch (Throwable t) {
+ out.close();
+
writeError(requestId, t, ctx);
}
}
- private CompletableFuture processOperation(ClientMessageUnpacker unpacker, ClientMessagePacker packer, int opCode)
- throws IOException {
+ private CompletableFuture processOperation(
+ ClientMessageUnpacker in,
+ ClientMessagePacker out,
+ int opCode
+ ) throws IOException {
// TODO: Handle all operations asynchronously (add async table API).
switch (opCode) {
- case ClientOp.TABLE_DROP: {
- var tableName = unpacker.unpackString();
-
- ignite.tables().dropTable(tableName);
-
- break;
- }
-
- case ClientOp.TABLES_GET: {
- List<Table> tables = ignite.tables().tables();
-
- packer.packMapHeader(tables.size());
-
- for (var table : tables) {
- var tableImpl = (TableImpl) table;
-
- packer.packUuid(tableImpl.tableId());
- packer.packString(table.tableName());
- }
-
- break;
- }
-
- case ClientOp.SCHEMAS_GET: {
- var table = readTable(unpacker);
-
- if (unpacker.getNextFormat() == MessageFormat.NIL) {
- // Return the latest schema.
- packer.packMapHeader(1);
+ case ClientOp.TABLE_DROP:
+ return ClientTableDropRequest.process(in, ignite.tables());
- var schema = table.schemaView().schema();
+ case ClientOp.TABLES_GET:
+ return ClientTablesGetRequest.process(out, ignite.tables());
- if (schema == null)
- throw new IgniteException("Schema registry is not initialized.");
+ case ClientOp.SCHEMAS_GET:
+ return ClientSchemasGetRequest.process(in, out, ignite.tables());
- writeSchema(packer, schema.version(), schema);
- }
- else {
- var cnt = unpacker.unpackArrayHeader();
- packer.packMapHeader(cnt);
+ case ClientOp.TABLE_GET:
+ return ClientTableGetRequest.process(in, out, ignite.tables());
- for (var i = 0; i < cnt; i++) {
- var schemaVer = unpacker.unpackInt();
- var schema = table.schemaView().schema(schemaVer);
- writeSchema(packer, schemaVer, schema);
- }
- }
+ case ClientOp.TUPLE_UPSERT:
+ return ClientTupleUpsertRequest.process(in, ignite.tables());
- break;
- }
-
- case ClientOp.TABLE_GET: {
- String tableName = unpacker.unpackString();
- Table table = ignite.tables().table(tableName);
-
- if (table == null)
- packer.packNil();
- else
- packer.packUuid(((TableImpl) table).tableId());
-
- break;
- }
-
- case ClientOp.TUPLE_UPSERT: {
- var table = readTable(unpacker);
- var tuple = readTuple(unpacker, table, false);
-
- return table.upsertAsync(tuple);
- }
+ case ClientOp.TUPLE_UPSERT_SCHEMALESS:
+ return ClientTupleUpsertSchemalessRequest.process(in, ignite.tables());
- case ClientOp.TUPLE_UPSERT_SCHEMALESS: {
- var table = readTable(unpacker);
- var tuple = readTupleSchemaless(unpacker, table);
-
- return table.upsertAsync(tuple);
- }
-
- case ClientOp.TUPLE_GET: {
- var table = readTable(unpacker);
- var keyTuple = readTuple(unpacker, table, true);
-
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(packer, t));
- }
+ case ClientOp.TUPLE_GET:
+ return ClientTupleGetRequest.process(in, out, ignite.tables());
default:
throw new IgniteException("Unexpected operation code: " + opCode);
}
-
- return null;
- }
-
- private void writeSchema(ClientMessagePacker packer, int schemaVer, SchemaDescriptor schema) throws IOException {
- packer.packInt(schemaVer);
-
- if (schema == null) {
- packer.packNil();
-
- return;
- }
-
- var colCnt = schema.columnNames().size();
- packer.packArrayHeader(colCnt);
-
- for (var colIdx = 0; colIdx < colCnt; colIdx++) {
- var col = schema.column(colIdx);
-
- packer.packArrayHeader(4);
- packer.packString(col.name());
- packer.packInt(getClientDataType(col.type().spec()));
- packer.packBoolean(schema.isKeyColumn(colIdx));
- packer.packBoolean(col.nullable());
- }
- }
-
- private void writeTuple(ClientMessagePacker packer, Tuple tuple) {
- try {
- if (tuple == null) {
- packer.packNil();
-
- return;
- }
-
- var schema = ((SchemaAware) tuple).schema();
-
- packer.packInt(schema.version());
-
- for (var col : schema.keyColumns().columns())
- writeColumnValue(packer, tuple, col);
-
- for (var col : schema.valueColumns().columns())
- writeColumnValue(packer, tuple, col);
- }
- catch (Throwable t) {
- throw new IgniteException("Failed to serialize tuple", t);
- }
- }
-
- private Tuple readTuple(ClientMessageUnpacker unpacker, TableImpl table, boolean keyOnly) throws IOException {
- var schemaId = unpacker.unpackInt();
- var schema = table.schemaView().schema(schemaId);
- var builder = table.tupleBuilder();
-
- var cnt = keyOnly ? schema.keyColumns().length() : schema.length();
-
- for (int i = 0; i < cnt; i++) {
- if (unpacker.getNextFormat() == MessageFormat.NIL) {
- unpacker.skipValue();
- continue;
- }
-
- readAndSetColumnValue(unpacker, builder, schema.column(i));
- }
-
- return builder.build();
- }
-
- private Tuple readTupleSchemaless(ClientMessageUnpacker unpacker, TableImpl table) throws IOException {
- var cnt = unpacker.unpackMapHeader();
- var builder = table.tupleBuilder();
-
- for (int i = 0; i < cnt; i++) {
- var colName = unpacker.unpackString();
-
- builder.set(colName, unpacker.unpackValue());
- }
-
- return builder.build();
- }
-
- private TableImpl readTable(ClientMessageUnpacker unpacker) throws IOException {
- var tableId = unpacker.unpackUuid();
-
- return ((IgniteTablesInternal)ignite.tables()).table(tableId);
- }
-
- private void readAndSetColumnValue(ClientMessageUnpacker unpacker, TupleBuilder builder, Column col)
- throws IOException {
- builder.set(col.name(), unpacker.unpackObject(getClientDataType(col.type().spec())));
- }
-
- private static int getClientDataType(NativeTypeSpec spec) {
- switch (spec) {
- case INT8:
- return ClientDataType.INT8;
-
- case INT16:
- return ClientDataType.INT16;
-
- case INT32:
- return ClientDataType.INT32;
-
- case INT64:
- return ClientDataType.INT64;
-
- case FLOAT:
- return ClientDataType.FLOAT;
-
- case DOUBLE:
- return ClientDataType.DOUBLE;
-
- case DECIMAL:
- return ClientDataType.DECIMAL;
-
- case UUID:
- return ClientDataType.UUID;
-
- case STRING:
- return ClientDataType.STRING;
-
- case BYTES:
- return ClientDataType.BYTES;
-
- case BITMASK:
- return ClientDataType.BITMASK;
- }
-
- throw new IgniteException("Unsupported native type: " + spec);
- }
-
- private void writeColumnValue(ClientMessagePacker packer, Tuple tuple, Column col) throws IOException {
- var val = tuple.value(col.name());
-
- if (val == null) {
- packer.packNil();
- return;
- }
-
- switch (col.type().spec()) {
- case INT8:
- packer.packByte((byte) val);
- break;
-
- case INT16:
- packer.packShort((short) val);
- break;
-
- case INT32:
- packer.packInt((int) val);
- break;
-
- case INT64:
- packer.packLong((long) val);
- break;
-
- case FLOAT:
- packer.packFloat((float) val);
- break;
-
- case DOUBLE:
- packer.packDouble((double) val);
- break;
-
- case DECIMAL:
- packer.packDecimal((BigDecimal) val);
- break;
-
- case UUID:
- packer.packUuid((UUID) val);
- break;
-
- case STRING:
- packer.packString((String) val);
- break;
-
- case BYTES:
- byte[] bytes = (byte[]) val;
- packer.packBinaryHeader(bytes.length);
- packer.writePayload(bytes);
- break;
-
- case BITMASK:
- packer.packBitSet((BitSet)val);
- break;
-
- default:
- throw new IgniteException("Data type not supported: " + col.type());
- }
}
/** {@inheritDoc} */
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientSchemasGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientSchemasGetRequest.java
new file mode 100644
index 0000000..e89e68a
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientSchemasGetRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.proto.ClientMessagePacker;
+import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.msgpack.core.MessageFormat;
+
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeSchema;
+
+/**
+ * Client schemas retrieval request.
+ */
+public class ClientSchemasGetRequest {
+ /**
+ * Processes the request.
+ *
+ * @param in Unpacker.
+ * @param out Packer.
+ * @param tables Ignite tables.
+ * @return Future.
+ * @throws IOException On serialization error.
+ * @throws IgniteException When schema registry is no initialized.
+ */
+ public static CompletableFuture<Object> process(
+ ClientMessageUnpacker in,
+ ClientMessagePacker out,
+ IgniteTables tables
+ ) throws IOException {
+ var table = readTable(in, tables);
+
+ if (in.getNextFormat() == MessageFormat.NIL) {
+ // Return the latest schema.
+ out.packMapHeader(1);
+
+ var schema = table.schemaView().schema();
+
+ if (schema == null)
+ throw new IgniteException("Schema registry is not initialized.");
+
+ writeSchema(out, schema.version(), schema);
+ }
+ else {
+ var cnt = in.unpackArrayHeader();
+ out.packMapHeader(cnt);
+
+ for (var i = 0; i < cnt; i++) {
+ var schemaVer = in.unpackInt();
+ var schema = table.schemaView().schema(schemaVer);
+ writeSchema(out, schemaVer, schema);
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
new file mode 100644
index 0000000..74ef6c2
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.client.proto.ClientDataType;
+import org.apache.ignite.client.proto.ClientMessagePacker;
+import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaAware;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.TupleBuilder;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.msgpack.core.MessageFormat;
+
+/**
+ * Common table functionality.
+ */
+class ClientTableCommon {
+ /**
+ * Writes a schema.
+ *
+ * @param packer Packer.
+ * @param schemaVer Schema version.
+ * @param schema Schema.
+ * @throws IOException When serialization fails.
+ */
+ public static void writeSchema(
+ ClientMessagePacker packer,
+ int schemaVer,
+ SchemaDescriptor schema
+ ) throws IOException {
+ packer.packInt(schemaVer);
+
+ if (schema == null) {
+ packer.packNil();
+
+ return;
+ }
+
+ var colCnt = schema.columnNames().size();
+ packer.packArrayHeader(colCnt);
+
+ for (var colIdx = 0; colIdx < colCnt; colIdx++) {
+ var col = schema.column(colIdx);
+
+ packer.packArrayHeader(4);
+ packer.packString(col.name());
+ packer.packInt(getClientDataType(col.type().spec()));
+ packer.packBoolean(schema.isKeyColumn(colIdx));
+ packer.packBoolean(col.nullable());
+ }
+ }
+
+ /**
+ * Writes a tuple.
+ *
+ * @param packer Packer.
+ * @param tuple Tuple.
+ * @throws IgniteException on failed serialization.
+ */
+ public static void writeTuple(ClientMessagePacker packer, Tuple tuple) {
+ try {
+ if (tuple == null) {
+ packer.packNil();
+
+ return;
+ }
+
+ var schema = ((SchemaAware) tuple).schema();
+
+ packer.packInt(schema.version());
+
+ for (var col : schema.keyColumns().columns())
+ writeColumnValue(packer, tuple, col);
+
+ for (var col : schema.valueColumns().columns())
+ writeColumnValue(packer, tuple, col);
+ }
+ catch (Throwable t) {
+ throw new IgniteException("Failed to serialize tuple", t);
+ }
+ }
+
+ /**
+ * Reads a tuple.
+ *
+ * @param unpacker Unpacker.
+ * @param table Table.
+ * @param keyOnly Whether only key fields are expected.
+ * @return Tuple.
+ * @throws IOException When deserialization fails.
+ */
+ public static Tuple readTuple(ClientMessageUnpacker unpacker, TableImpl table, boolean keyOnly) throws IOException {
+ var schemaId = unpacker.unpackInt();
+ var schema = table.schemaView().schema(schemaId);
+ var builder = table.tupleBuilder();
+
+ var cnt = keyOnly ? schema.keyColumns().length() : schema.length();
+
+ for (int i = 0; i < cnt; i++) {
+ if (unpacker.getNextFormat() == MessageFormat.NIL) {
+ unpacker.skipValue();
+ continue;
+ }
+
+ readAndSetColumnValue(unpacker, builder, schema.column(i));
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Reads a tuple as a map, without schema.
+ *
+ * @param unpacker Unpacker.
+ * @param table Table.
+ * @return Tuple.
+ * @throws IOException When deserialization fails.
+ */
+ public static Tuple readTupleSchemaless(ClientMessageUnpacker unpacker, TableImpl table) throws IOException {
+ var cnt = unpacker.unpackMapHeader();
+ var builder = table.tupleBuilder();
+
+ for (int i = 0; i < cnt; i++) {
+ var colName = unpacker.unpackString();
+
+ builder.set(colName, unpacker.unpackValue());
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Reads a table.
+ *
+ * @param unpacker Unpacker.
+ * @param tables Ignite tables.
+ * @return Table.
+ * @throws IOException When deserialization fails.
+ */
+ public static TableImpl readTable(ClientMessageUnpacker unpacker, IgniteTables tables) throws IOException {
+ var tableId = unpacker.unpackUuid();
+
+ return ((IgniteTablesInternal)tables).table(tableId);
+ }
+
+ private static void readAndSetColumnValue(ClientMessageUnpacker unpacker, TupleBuilder builder, Column col)
+ throws IOException {
+ builder.set(col.name(), unpacker.unpackObject(getClientDataType(col.type().spec())));
+ }
+
+ private static int getClientDataType(NativeTypeSpec spec) {
+ switch (spec) {
+ case INT8:
+ return ClientDataType.INT8;
+
+ case INT16:
+ return ClientDataType.INT16;
+
+ case INT32:
+ return ClientDataType.INT32;
+
+ case INT64:
+ return ClientDataType.INT64;
+
+ case FLOAT:
+ return ClientDataType.FLOAT;
+
+ case DOUBLE:
+ return ClientDataType.DOUBLE;
+
+ case DECIMAL:
+ return ClientDataType.DECIMAL;
+
+ case UUID:
+ return ClientDataType.UUID;
+
+ case STRING:
+ return ClientDataType.STRING;
+
+ case BYTES:
+ return ClientDataType.BYTES;
+
+ case BITMASK:
+ return ClientDataType.BITMASK;
+ }
+
+ throw new IgniteException("Unsupported native type: " + spec);
+ }
+
+ private static void writeColumnValue(ClientMessagePacker packer, Tuple tuple, Column col) throws IOException {
+ var val = tuple.value(col.name());
+
+ if (val == null) {
+ packer.packNil();
+ return;
+ }
+
+ switch (col.type().spec()) {
+ case INT8:
+ packer.packByte((byte) val);
+ break;
+
+ case INT16:
+ packer.packShort((short) val);
+ break;
+
+ case INT32:
+ packer.packInt((int) val);
+ break;
+
+ case INT64:
+ packer.packLong((long) val);
+ break;
+
+ case FLOAT:
+ packer.packFloat((float) val);
+ break;
+
+ case DOUBLE:
+ packer.packDouble((double) val);
+ break;
+
+ case DECIMAL:
+ packer.packDecimal((BigDecimal) val);
+ break;
+
+ case UUID:
+ packer.packUuid((UUID) val);
+ break;
+
+ case STRING:
+ packer.packString((String) val);
+ break;
+
+ case BYTES:
+ byte[] bytes = (byte[]) val;
+ packer.packBinaryHeader(bytes.length);
+ packer.writePayload(bytes);
+ break;
+
+ case BITMASK:
+ packer.packBitSet((BitSet)val);
+ break;
+
+ default:
+ throw new IgniteException("Data type not supported: " + col.type());
+ }
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
similarity index 57%
copy from modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
index d5252fe..32b580e 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
@@ -15,46 +15,33 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client;
+package org.apache.ignite.client.handler.requests.table;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.table.manager.IgniteTables;
/**
- * Thin client payload input channel.
+ * Client table drop request.
*/
-public class PayloadInputChannel {
- /** Client channel. */
- private final ClientChannel ch;
-
- /** Input stream. */
- private final ClientMessageUnpacker in;
-
+public class ClientTableDropRequest {
/**
- * Constructor.
+ * Processes the request.
*
- * @param ch Channel.
* @param in Unpacker.
+ * @param tables Ignite tables.
+ * @return Future.
+ * @throws IOException On serialization error.
*/
- PayloadInputChannel(ClientChannel ch, ClientMessageUnpacker in) {
- this.in = in;
- this.ch = ch;
- }
+ public static CompletableFuture<Object> process(
+ ClientMessageUnpacker in,
+ IgniteTables tables
+ ) throws IOException {
+ var tableName = in.unpackString();
- /**
- * Gets client channel.
- *
- * @return Client channel.
- */
- public ClientChannel clientChannel() {
- return ch;
- }
+ tables.dropTable(tableName);
- /**
- * Gets the unpacker.
- *
- * @return Unpacker.
- */
- public ClientMessageUnpacker in() {
- return in;
+ return null;
}
}
diff --git a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
similarity index 50%
copy from modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
index fefb14d..5e48025 100644
--- a/modules/client-common/src/test/java/org/apache/ignite/client/proto/ClientMessagePackerUnpackerTest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
@@ -15,36 +15,42 @@
* limitations under the License.
*/
-package org.apache.ignite.client.proto;
+package org.apache.ignite.client.handler.requests.table;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
-import org.junit.jupiter.api.Test;
-import org.msgpack.core.buffer.ArrayBufferInput;
-
-import java.io.IOException;
-import java.util.UUID;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.manager.IgniteTables;
/**
- * Tests Ignite-specific MsgPack extensions.
+ * Client table retrieval request.
*/
-public class ClientMessagePackerUnpackerTest {
- @Test
- public void testUUID() throws IOException {
- testUUID(UUID.randomUUID());
- testUUID(new UUID(0, 0));
- }
-
- private void testUUID(UUID u) throws IOException {
- var packer = new ClientMessagePacker();
- packer.packUuid(u);
- byte[] data = packer.toByteArray();
-
- var unpacker = new ClientMessageUnpacker(new ArrayBufferInput(data));
- var res = unpacker.unpackUuid();
-
- assertEquals(u, res);
+public class ClientTableGetRequest {
+ /**
+ * Processes the request.
+ *
+ * @param in Unpacker.
+ * @param out Packer.
+ * @param tables Ignite tables.
+ * @return Future.
+ * @throws IOException On serialization error.
+ */
+ public static CompletableFuture<Object> process(
+ ClientMessageUnpacker in,
+ ClientMessagePacker out,
+ IgniteTables tables
+ ) throws IOException {
+ String tableName = in.unpackString();
+ Table table = tables.table(tableName);
+
+ if (table == null)
+ out.packNil();
+ else
+ out.packUuid(((TableImpl) table).tableId());
+
+ return null;
}
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
similarity index 50%
copy from modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
index 0db8c1c..a05dcd4 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
@@ -15,52 +15,41 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client;
+package org.apache.ignite.client.handler.requests.table;
import java.io.IOException;
-
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.manager.IgniteTables;
/**
- * Thin client payload output channel.
+ * Client tables retrieval request.
*/
-public class PayloadOutputChannel implements AutoCloseable {
- /** Client channel. */
- private final ClientChannel ch;
-
- /** Output stream. */
- private final ClientMessagePacker out;
-
+public class ClientTablesGetRequest {
/**
- * Constructor.
+ * Processes the request.
*
- * @param ch Channel.
+ * @param out Packer.
+ * @param igniteTables Ignite tables.
+ * @return Future.
+ * @throws IOException On serialization error.
*/
- PayloadOutputChannel(ClientChannel ch) {
- out = new ClientMessagePacker();
- this.ch = ch;
- }
+ public static CompletableFuture<Object> process(
+ ClientMessagePacker out,
+ IgniteTables igniteTables
+ ) throws IOException {
+ var tables = igniteTables.tables();
- /**
- * Gets client channel.
- *
- * @return Client channel.
- */
- public ClientChannel clientChannel() {
- return ch;
- }
+ out.packMapHeader(tables.size());
- /**
- * Gets the unpacker.
- *
- * @return Unpacker.
- */
- public ClientMessagePacker out() {
- return out;
- }
+ for (var table : tables) {
+ var tableImpl = (TableImpl) table;
+
+ out.packUuid(tableImpl.tableId());
+ out.packString(table.tableName());
+ }
- /** {@inheritDoc} */
- @Override public void close() throws IOException {
- out.close();
+ return null;
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
new file mode 100644
index 0000000..bc3bce0
--- /dev/null
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.proto.ClientMessagePacker;
+import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.table.manager.IgniteTables;
+
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
+
+/**
+ * Client tuple get request.
+ */
+public class ClientTupleGetRequest {
+ /**
+ * Processes the request.
+ *
+ * @param in Unpacker.
+ * @param out Packer.
+ * @param tables Ignite tables.
+ * @return Future.
+ * @throws IOException On serialization error.
+ */
+ public static CompletableFuture<Void> process(
+ ClientMessageUnpacker in,
+ ClientMessagePacker out,
+ IgniteTables tables
+ ) throws IOException {
+ var table = readTable(in, tables);
+ var keyTuple = readTuple(in, table, true);
+
+ return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ }
+}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
similarity index 52%
copy from modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
index d5252fe..8b98ea7 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
@@ -15,46 +15,32 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client;
+package org.apache.ignite.client.handler.requests.table;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.table.manager.IgniteTables;
+
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
/**
- * Thin client payload input channel.
+ * Client tuple upsert request.
*/
-public class PayloadInputChannel {
- /** Client channel. */
- private final ClientChannel ch;
-
- /** Input stream. */
- private final ClientMessageUnpacker in;
-
+public class ClientTupleUpsertRequest {
/**
- * Constructor.
+ * Processes the request.
*
- * @param ch Channel.
* @param in Unpacker.
+ * @param tables Ignite tables.
+ * @return Future.
+ * @throws IOException On serialization error.
*/
- PayloadInputChannel(ClientChannel ch, ClientMessageUnpacker in) {
- this.in = in;
- this.ch = ch;
- }
-
- /**
- * Gets client channel.
- *
- * @return Client channel.
- */
- public ClientChannel clientChannel() {
- return ch;
- }
+ public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) throws IOException {
+ var table = readTable(in, tables);
+ var tuple = readTuple(in, table, false);
- /**
- * Gets the unpacker.
- *
- * @return Unpacker.
- */
- public ClientMessageUnpacker in() {
- return in;
+ return table.upsertAsync(tuple);
}
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertSchemalessRequest.java
similarity index 52%
copy from modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertSchemalessRequest.java
index d5252fe..2a0bbcc 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertSchemalessRequest.java
@@ -15,46 +15,32 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client;
+package org.apache.ignite.client.handler.requests.table;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.table.manager.IgniteTables;
+
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTupleSchemaless;
/**
- * Thin client payload input channel.
+ * Client tuple upsert request.
*/
-public class PayloadInputChannel {
- /** Client channel. */
- private final ClientChannel ch;
-
- /** Input stream. */
- private final ClientMessageUnpacker in;
-
+public class ClientTupleUpsertSchemalessRequest {
/**
- * Constructor.
+ * Processes the request.
*
- * @param ch Channel.
* @param in Unpacker.
+ * @param tables Ignite tables.
+ * @return Future.
+ * @throws IOException On serialization error.
*/
- PayloadInputChannel(ClientChannel ch, ClientMessageUnpacker in) {
- this.in = in;
- this.ch = ch;
- }
-
- /**
- * Gets client channel.
- *
- * @return Client channel.
- */
- public ClientChannel clientChannel() {
- return ch;
- }
+ public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) throws IOException {
+ var table = readTable(in, tables);
+ var tuple = readTupleSchemaless(in, table);
- /**
- * Gets the unpacker.
- *
- * @return Unpacker.
- */
- public ClientMessageUnpacker in() {
- return in;
+ return table.upsertAsync(tuple);
}
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/package-info.java
similarity index 67%
copy from modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/package-info.java
index da437a3..a857f7a 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/package-info.java
@@ -15,20 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.io;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
/**
- * Handles thin client responses and server to client notifications.
+ * Table API requests.
*/
-public interface ClientMessageHandler {
- /**
- * Handles messages from the server.
- *
- * @param buf Buffer.
- * @throws IOException on failure.
- */
- void onMessage(ByteBuffer buf) throws IOException;
-}
+package org.apache.ignite.client.handler.requests.table;
diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java
index ad482d2..67a8454 100644
--- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Collections;
-
import io.netty.channel.ChannelFuture;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.configuration.annotation.ConfigurationType;
@@ -47,6 +46,8 @@ public class ClientHandlerIntegrationTest {
private ChannelFuture serverFuture;
+ private ConfigurationRegistry configurationRegistry;
+
@BeforeEach
public void setUp() throws Exception {
serverFuture = startServer();
@@ -56,6 +57,7 @@ public class ClientHandlerIntegrationTest {
public void tearDown() throws Exception {
serverFuture.cancel(true);
serverFuture.await();
+ configurationRegistry.stop();
}
@Test
@@ -170,17 +172,17 @@ public class ClientHandlerIntegrationTest {
}
private ChannelFuture startServer() throws InterruptedException {
- var registry = new ConfigurationRegistry(
+ configurationRegistry = new ConfigurationRegistry(
Collections.singletonList(ClientConnectorConfiguration.KEY),
Collections.emptyMap(),
Collections.singletonList(new TestConfigurationStorage(ConfigurationType.LOCAL))
);
- registry.start();
+ configurationRegistry.start();
var module = new ClientHandlerModule(mock(Ignite.class), NOPLogger.NOP_LOGGER);
- module.prepareStart(registry);
+ module.prepareStart(configurationRegistry);
return module.start();
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
index d5252fe..c082e26 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadInputChannel.java
@@ -22,7 +22,7 @@ import org.apache.ignite.client.proto.ClientMessageUnpacker;
/**
* Thin client payload input channel.
*/
-public class PayloadInputChannel {
+public class PayloadInputChannel implements AutoCloseable {
/** Client channel. */
private final ClientChannel ch;
@@ -57,4 +57,9 @@ public class PayloadInputChannel {
public ClientMessageUnpacker in() {
return in;
}
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ in.close();
+ }
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
index 0db8c1c..bf33bf6 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.client;
-import java.io.IOException;
-
import org.apache.ignite.client.proto.ClientMessagePacker;
/**
@@ -35,10 +33,11 @@ public class PayloadOutputChannel implements AutoCloseable {
* Constructor.
*
* @param ch Channel.
+ * @param out Packer.
*/
- PayloadOutputChannel(ClientChannel ch) {
- out = new ClientMessagePacker();
+ PayloadOutputChannel(ClientChannel ch, ClientMessagePacker out) {
this.ch = ch;
+ this.out = out;
}
/**
@@ -60,7 +59,7 @@ public class PayloadOutputChannel implements AutoCloseable {
}
/** {@inheritDoc} */
- @Override public void close() throws IOException {
+ @Override public void close() {
out.close();
}
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 50b7ede..cb535fb 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.client;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -30,13 +29,15 @@ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import org.apache.ignite.client.IgniteClientAuthenticationException;
import org.apache.ignite.client.IgniteClientAuthorizationException;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.client.IgniteClientException;
import org.apache.ignite.client.proto.ClientErrorCode;
+import org.apache.ignite.client.proto.ClientMessageCommon;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.client.proto.ProtocolVersion;
@@ -47,7 +48,6 @@ import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
import org.apache.ignite.internal.client.io.ClientMessageHandler;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
-import org.msgpack.core.buffer.ByteBufferInput;
/**
* Implements {@link ClientChannel} over TCP.
@@ -106,7 +106,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
}
/** {@inheritDoc} */
- @Override public void onMessage(ByteBuffer buf) throws IOException {
+ @Override public void onMessage(ByteBuf buf) throws IOException {
processNextMessage(buf);
}
@@ -155,14 +155,16 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
throws IgniteClientException {
long id = reqId.getAndIncrement();
- try (PayloadOutputChannel payloadCh = new PayloadOutputChannel(this)) {
- if (closed())
- throw new IgniteClientConnectionException("Channel is closed");
+ if (closed())
+ throw new IgniteClientConnectionException("Channel is closed");
+
+ ClientRequestFuture fut = new ClientRequestFuture();
- ClientRequestFuture fut = new ClientRequestFuture();
+ pendingReqs.put(id, fut);
- pendingReqs.put(id, fut);
+ PayloadOutputChannel payloadCh = new PayloadOutputChannel(this, new ClientMessagePacker(sock.getBuffer()));
+ try {
var req = payloadCh.out();
req.packInt(opCode);
@@ -177,8 +179,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
});
return fut;
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
+ // Close buffer manually on fail. Successful write closes the buffer automatically.
+ payloadCh.close();
pendingReqs.remove(id);
throw convertException(t);
@@ -197,8 +200,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
if (payload == null || payloadReader == null)
return null;
- try {
- return payloadReader.apply(new PayloadInputChannel(this, payload));
+ try (var in = new PayloadInputChannel(this, payload)) {
+ return payloadReader.apply(in);
} catch (Exception e) {
throw new IgniteException("Failed to deserialize server response: " + e.getMessage(), e);
}
@@ -227,8 +230,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/**
* Process next message from the input stream and complete corresponding future.
*/
- private void processNextMessage(ByteBuffer buf) throws IgniteClientException, IOException {
- var unpacker = new ClientMessageUnpacker(new ByteBufferInput(buf));
+ private void processNextMessage(ByteBuf buf) throws IgniteClientException, IOException {
+ var unpacker = new ClientMessageUnpacker(buf);
if (protocolCtx == null) {
// Process handshake.
@@ -297,18 +300,19 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/** Send handshake request. */
private void handshakeReq(ProtocolVersion proposedVer) throws IOException {
- try (var req = new ClientMessagePacker()) {
- req.packInt(proposedVer.major());
- req.packInt(proposedVer.minor());
- req.packInt(proposedVer.patch());
+ sock.send(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
- req.packInt(2); // Client type: general purpose.
+ var req = new ClientMessagePacker(sock.getBuffer());
+ req.packInt(proposedVer.major());
+ req.packInt(proposedVer.minor());
+ req.packInt(proposedVer.patch());
- req.packBinaryHeader(0); // Features.
- req.packMapHeader(0); // Extensions.
+ req.packInt(2); // Client type: general purpose.
- write(req).syncUninterruptibly();
- }
+ req.packBinaryHeader(0); // Features.
+ req.packMapHeader(0); // Extensions.
+
+ write(req).syncUninterruptibly();
}
/**
@@ -322,7 +326,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/** Receive and handle handshake response. */
private void handshakeRes(ClientMessageUnpacker unpacker, ProtocolVersion proposedVer)
throws IgniteClientConnectionException, IgniteClientAuthenticationException {
- try {
+ try (unpacker) {
ProtocolVersion srvVer = new ProtocolVersion(unpacker.unpackShort(), unpacker.unpackShort(),
unpacker.unpackShort());
@@ -363,7 +367,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/** Write bytes to the output stream. */
private ChannelFuture write(ClientMessagePacker packer) throws IgniteClientConnectionException {
- var buf = packer.toMessageBuffer().sliceAsByteBuffer();
+ var buf = packer.getBuffer();
return sock.send(buf);
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
index 8e25329..67ea66e 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnection.java
@@ -17,8 +17,7 @@
package org.apache.ignite.internal.client.io;
-import java.nio.ByteBuffer;
-
+import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import org.apache.ignite.lang.IgniteException;
@@ -32,7 +31,14 @@ public interface ClientConnection extends AutoCloseable {
* @param msg Message buffer.
* @return Future for the operation.
*/
- ChannelFuture send(ByteBuffer msg) throws IgniteException;
+ ChannelFuture send(ByteBuf msg) throws IgniteException;
+
+ /**
+ * Gets a buffer to write to.
+ *
+ * @return Buffer.
+ */
+ ByteBuf getBuffer();
/**
* Closes the connection.
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
index da437a3..dc93455 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.client.io;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
/**
* Handles thin client responses and server to client notifications.
@@ -30,5 +30,5 @@ public interface ClientMessageHandler {
* @param buf Buffer.
* @throws IOException on failure.
*/
- void onMessage(ByteBuffer buf) throws IOException;
+ void onMessage(ByteBuf buf) throws IOException;
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
index f5955aa..6448460 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
@@ -18,8 +18,8 @@
package org.apache.ignite.internal.client.io.netty;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
@@ -60,11 +60,17 @@ public class NettyClientConnection implements ClientConnection {
}
/** {@inheritDoc} */
- @Override public ChannelFuture send(ByteBuffer msg) throws IgniteException {
+ @Override public ChannelFuture send(ByteBuf msg) throws IgniteException {
+ // writeAndFlush releases pooled buffer.
return channel.writeAndFlush(msg);
}
/** {@inheritDoc} */
+ @Override public ByteBuf getBuffer() {
+ return channel.alloc().buffer();
+ }
+
+ /** {@inheritDoc} */
@Override public void close() {
channel.close();
}
@@ -75,7 +81,7 @@ public class NettyClientConnection implements ClientConnection {
* @param buf Message.
* @throws IOException when message can't be decoded.
*/
- void onMessage(ByteBuffer buf) throws IOException {
+ void onMessage(ByteBuf buf) throws IOException {
msgHnd.onMessage(buf);
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
index c9cc4c8..b71fbcb 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.client.io.netty;
import java.net.InetSocketAddress;
-
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
@@ -28,7 +27,6 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.client.proto.ClientMessageDecoder;
-import org.apache.ignite.client.proto.ClientMessageEncoder;
import org.apache.ignite.internal.client.io.ClientConnection;
import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
@@ -65,7 +63,6 @@ public class NettyClientConnectionMultiplexer implements ClientConnectionMultipl
throws Exception {
ch.pipeline().addLast(
new ClientMessageDecoder(),
- new ClientMessageEncoder(),
new NettyClientMessageHandler());
}
});
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientMessageHandler.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientMessageHandler.java
index 670703c..840d560 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientMessageHandler.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientMessageHandler.java
@@ -18,8 +18,7 @@
package org.apache.ignite.internal.client.io.netty;
import java.io.IOException;
-import java.nio.ByteBuffer;
-
+import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -31,11 +30,11 @@ import static org.apache.ignite.internal.client.io.netty.NettyClientConnection.A
public class NettyClientMessageHandler extends ChannelInboundHandlerAdapter {
/** {@inheritDoc} */
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
- ctx.channel().attr(ATTR_CONN).get().onMessage((ByteBuffer) msg);
+ ctx.channel().attr(ATTR_CONN).get().onMessage((ByteBuf) msg);
}
/** {@inheritDoc} */
- @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ @Override public void channelInactive(ChannelHandlerContext ctx) {
ctx.channel().attr(ATTR_CONN).get().onDisconnected(null);
}
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index a226ef6..dbaf32c 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -17,9 +17,10 @@
package org.apache.ignite.client;
+import java.net.InetSocketAddress;
import java.util.Collections;
-
import io.netty.channel.ChannelFuture;
+import io.netty.util.ResourceLeakDetector;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.handler.ClientHandlerModule;
@@ -42,6 +43,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
public abstract class AbstractClientTest {
protected static final String DEFAULT_TABLE = "default_test_table";
+ protected static ConfigurationRegistry configurationRegistry;
+
protected static ChannelFuture serverFuture;
protected static Ignite server;
@@ -50,6 +53,8 @@ public abstract class AbstractClientTest {
@BeforeAll
public static void beforeAll() throws Exception {
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+
serverFuture = startServer(null);
client = startClient();
}
@@ -59,6 +64,7 @@ public abstract class AbstractClientTest {
client.close();
serverFuture.cancel(true);
serverFuture.await();
+ configurationRegistry.stop();
}
@BeforeEach
@@ -68,8 +74,11 @@ public abstract class AbstractClientTest {
}
public static Ignite startClient(String... addrs) {
- if (addrs == null || addrs.length == 0)
- addrs = new String[]{"127.0.0.2:10800"};
+ if (addrs == null || addrs.length == 0) {
+ var serverPort = ((InetSocketAddress)serverFuture.channel().localAddress()).getPort();
+
+ addrs = new String[]{"127.0.0.2:" + serverPort};
+ }
var builder = IgniteClient.builder().addresses(addrs);
@@ -77,19 +86,19 @@ public abstract class AbstractClientTest {
}
public static ChannelFuture startServer(String host) throws InterruptedException {
- var registry = new ConfigurationRegistry(
+ configurationRegistry = new ConfigurationRegistry(
Collections.singletonList(ClientConnectorConfiguration.KEY),
Collections.emptyMap(),
Collections.singletonList(new TestConfigurationStorage(ConfigurationType.LOCAL))
);
- registry.start();
+ configurationRegistry.start();
server = new FakeIgnite();
var module = new ClientHandlerModule(server, NOPLogger.NOP_LOGGER);
- module.prepareStart(registry);
+ module.prepareStart(configurationRegistry);
return module.start();
}