You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/08/06 20:37:02 UTC
[ignite-3] branch main updated: IGNITE-15224 Thin 3.0: Implement
Table API operations (#263)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ab4a585 IGNITE-15224 Thin 3.0: Implement Table API operations (#263)
ab4a585 is described below
commit ab4a585c4fbc5b7df81011fc3f72bf10254dde66
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Fri Aug 6 23:36:54 2021 +0300
IGNITE-15224 Thin 3.0: Implement Table API operations (#263)
* Implement all Table API operations except invoke and transactions.
* Override packer/unpacker methods without checked exceptions for cleaner async code.
---
.../ignite/client/proto/ClientMessagePacker.java | 257 +++++++++++++++-
.../ignite/client/proto/ClientMessageUnpacker.java | 308 ++++++++++++++++++-
.../org/apache/ignite/client/proto/ClientOp.java | 60 ++++
.../handler/ClientInboundMessageHandler.java | 81 ++++-
.../requests/table/ClientSchemasGetRequest.java | 4 +-
.../handler/requests/table/ClientTableCommon.java | 218 ++++++++++++--
.../requests/table/ClientTableDropRequest.java | 7 +-
.../requests/table/ClientTableGetRequest.java | 17 +-
.../requests/table/ClientTablesGetRequest.java | 16 +-
....java => ClientTupleDeleteAllExactRequest.java} | 16 +-
...quest.java => ClientTupleDeleteAllRequest.java} | 16 +-
...est.java => ClientTupleDeleteExactRequest.java} | 13 +-
...tRequest.java => ClientTupleDeleteRequest.java} | 13 +-
...tRequest.java => ClientTupleGetAllRequest.java} | 16 +-
...st.java => ClientTupleGetAndDeleteRequest.java} | 12 +-
...t.java => ClientTupleGetAndReplaceRequest.java} | 12 +-
...ClientTupleGetAndReplaceSchemalessRequest.java} | 14 +-
...st.java => ClientTupleGetAndUpsertRequest.java} | 12 +-
... ClientTupleGetAndUpsertSchemalessRequest.java} | 14 +-
.../requests/table/ClientTupleGetRequest.java | 4 +-
...quest.java => ClientTupleInsertAllRequest.java} | 16 +-
... => ClientTupleInsertAllSchemalessRequest.java} | 16 +-
...tRequest.java => ClientTupleInsertRequest.java} | 13 +-
...ava => ClientTupleInsertSchemalessRequest.java} | 15 +-
...st.java => ClientTupleReplaceExactRequest.java} | 17 +-
... ClientTupleReplaceExactSchemalessRequest.java} | 17 +-
...Request.java => ClientTupleReplaceRequest.java} | 13 +-
...va => ClientTupleReplaceSchemalessRequest.java} | 15 +-
...quest.java => ClientTupleUpsertAllRequest.java} | 14 +-
... => ClientTupleUpsertAllSchemalessRequest.java} | 14 +-
.../requests/table/ClientTupleUpsertRequest.java | 4 +-
.../table/ClientTupleUpsertSchemalessRequest.java | 6 +-
.../ignite/internal/client/TcpClientChannel.java | 24 +-
.../internal/client/io/ClientMessageHandler.java | 2 +-
.../ignite/internal/client/table/ClientTable.java | 331 +++++++++++++++++----
.../ignite/internal/client/table/ClientTables.java | 14 +
.../internal/client/table/ClientTupleBuilder.java | 12 +-
.../apache/ignite/client/AbstractClientTest.java | 21 +-
.../org/apache/ignite/client/ClientTableTest.java | 319 ++++++++++++++++++--
.../java/org/apache/ignite/client/CustomTuple.java | 175 +++++++++++
.../ignite/client/fakes/FakeIgniteTables.java | 34 ++-
.../ignite/client/fakes/FakeInternalTable.java | 110 ++++++-
.../ignite/client/fakes/FakeSchemaRegistry.java | 97 ++++++
43 files changed, 2001 insertions(+), 408 deletions(-)
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java
index 9cc79c4..facefa2 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessagePacker.java
@@ -18,16 +18,18 @@
package org.apache.ignite.client.proto;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.UUID;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
-import org.apache.ignite.lang.IgniteException;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.core.buffer.OutputStreamBufferOutput;
+import org.msgpack.value.Value;
import static org.apache.ignite.client.proto.ClientMessageCommon.HEADER_SIZE;
@@ -49,6 +51,7 @@ public class ClientMessagePacker extends MessagePacker {
* @param buf Buffer.
*/
public ClientMessagePacker(ByteBuf buf) {
+ // TODO: Remove intermediate classes and buffers IGNITE-15234.
// Reserve 4 bytes for the message length.
super(new OutputStreamBufferOutput(new ByteBufOutputStream(buf.writerIndex(HEADER_SIZE))),
MessagePack.DEFAULT_PACKER_CONFIG);
@@ -60,14 +63,14 @@ public class ClientMessagePacker extends MessagePacker {
* Gets the underlying buffer.
*
* @return Underlying buffer.
- * @throws IgniteException When flush fails.
+ * @throws UncheckedIOException When flush fails.
*/
public ByteBuf getBuffer() {
try {
flush();
}
catch (IOException e) {
- throw new IgniteException(e);
+ throw new UncheckedIOException(e);
}
buf.setInt(0, buf.writerIndex() - HEADER_SIZE);
@@ -75,16 +78,238 @@ public class ClientMessagePacker extends MessagePacker {
return buf;
}
+ /** {@inheritDoc} */
+ @Override public MessagePacker packNil() {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packNil();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packBoolean(boolean b) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packBoolean(b);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packByte(byte b) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packByte(b);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packShort(short v) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packShort(v);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packInt(int r) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packInt(r);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packLong(long v) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packLong(v);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packBigInteger(BigInteger bi) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packBigInteger(bi);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packFloat(float v) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packFloat(v);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packDouble(double v) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packDouble(v);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packString(String s) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packString(s);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packArrayHeader(int arraySize) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packArrayHeader(arraySize);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packMapHeader(int mapSize) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packMapHeader(mapSize);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packValue(Value v) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packValue(v);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packExtensionTypeHeader(byte extType, int payloadLen) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packExtensionTypeHeader(extType, payloadLen);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packBinaryHeader(int len) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packBinaryHeader(len);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker packRawStringHeader(int len) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.packRawStringHeader(len);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker writePayload(byte[] src) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.writePayload(src);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker writePayload(byte[] src, int off, int len) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.writePayload(src, off, len);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker addPayload(byte[] src) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.addPayload(src);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessagePacker addPayload(byte[] src, int off, int len) {
+ assert !closed : "Packer is closed";
+
+ try {
+ return super.addPayload(src, off, len);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
/**
* Writes an UUID.
*
* @param val UUID value.
* @return This instance.
- * @throws IOException when underlying output throws IOException.
*/
- public ClientMessagePacker packUuid(UUID val) throws IOException {
+ public ClientMessagePacker packUuid(UUID val) {
+ assert !closed : "Packer is closed";
+
packExtensionTypeHeader(ClientMsgPackType.UUID, 16);
+ // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
var bytes = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(bytes);
@@ -101,10 +326,12 @@ public class ClientMessagePacker extends MessagePacker {
*
* @param val Decimal value.
* @return This instance.
- * @throws IOException when underlying output throws IOException.
+ * @throws UnsupportedOperationException Not supported.
*/
- public ClientMessagePacker packDecimal(BigDecimal val) throws IOException {
- throw new IOException("TODO: IGNITE-15163");
+ public ClientMessagePacker packDecimal(BigDecimal val) {
+ assert !closed : "Packer is closed";
+
+ throw new UnsupportedOperationException("TODO: IGNITE-15163");
}
/**
@@ -112,10 +339,12 @@ public class ClientMessagePacker extends MessagePacker {
*
* @param val Bit set value.
* @return This instance.
- * @throws IOException when underlying output throws IOException.
+ * @throws UnsupportedOperationException Not supported.
*/
- public ClientMessagePacker packBitSet(BitSet val) throws IOException {
- throw new IOException("TODO: IGNITE-15163");
+ public ClientMessagePacker packBitSet(BitSet val) {
+ assert !closed : "Packer is closed";
+
+ throw new UnsupportedOperationException("TODO: IGNITE-15163");
}
/**
@@ -123,9 +352,9 @@ public class ClientMessagePacker extends MessagePacker {
*
* @param val Object value.
* @return This instance.
- * @throws IOException when underlying output throws IOException.
+ * @throws UnsupportedOperationException When type is not supported.
*/
- public ClientMessagePacker packObject(Object val) throws IOException {
+ public ClientMessagePacker packObject(Object val) {
if (val == null)
return (ClientMessagePacker) packNil();
@@ -156,7 +385,7 @@ public class ClientMessagePacker extends MessagePacker {
return packBitSet((BitSet) val);
// TODO: Support all basic types IGNITE-15163
- throw new IOException("Unsupported type, can't serialize: " + val.getClass());
+ throw new UnsupportedOperationException("Unsupported type, can't serialize: " + val.getClass());
}
/** {@inheritDoc} */
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java
index 6a2c76a..a058c9c 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientMessageUnpacker.java
@@ -18,18 +18,23 @@
package org.apache.ignite.client.proto;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.UUID;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import org.apache.ignite.lang.IgniteException;
+import org.msgpack.core.ExtensionTypeHeader;
+import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageSizeException;
import org.msgpack.core.MessageTypeException;
import org.msgpack.core.MessageUnpacker;
import org.msgpack.core.buffer.InputStreamBufferInput;
+import org.msgpack.value.ImmutableValue;
import static org.apache.ignite.client.proto.ClientDataType.BITMASK;
import static org.apache.ignite.client.proto.ClientDataType.BYTES;
@@ -51,8 +56,11 @@ public class ClientMessageUnpacker extends MessageUnpacker {
/** Underlying buffer. */
private final ByteBuf buf;
- /** Closed flag. */
- private boolean closed = false;
+ /** Underlying input. */
+ private final InputStreamBufferInput in;
+
+ /** Ref count. */
+ private int refCnt = 1;
/**
* Constructor.
@@ -60,20 +68,258 @@ public class ClientMessageUnpacker extends MessageUnpacker {
* @param buf Input.
*/
public ClientMessageUnpacker(ByteBuf buf) {
- super(new InputStreamBufferInput(new ByteBufInputStream(buf)), MessagePack.DEFAULT_UNPACKER_CONFIG);
+ // TODO: Remove intermediate classes and buffers IGNITE-15234.
+ this(new InputStreamBufferInput(new ByteBufInputStream(buf)), buf);
+ }
+ private ClientMessageUnpacker(InputStreamBufferInput in, ByteBuf buf) {
+ super(in, MessagePack.DEFAULT_UNPACKER_CONFIG);
+
+ this.in = in;
this.buf = buf;
}
+ /** {@inheritDoc} */
+ @Override public int unpackInt() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackInt();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String unpackString() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackString();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unpackNil() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ super.unpackNil();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean unpackBoolean() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackBoolean();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte unpackByte() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackByte();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public short unpackShort() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackShort();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long unpackLong() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackLong();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public BigInteger unpackBigInteger() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackBigInteger();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public float unpackFloat() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackFloat();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public double unpackDouble() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackDouble();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int unpackArrayHeader() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackArrayHeader();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int unpackMapHeader() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackMapHeader();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExtensionTypeHeader unpackExtensionTypeHeader() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackExtensionTypeHeader();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int unpackBinaryHeader() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackBinaryHeader();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tryUnpackNil() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.tryUnpackNil();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] readPayload(int length) {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.readPayload(length);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessageFormat getNextFormat() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.getNextFormat();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void skipValue(int count) {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ super.skipValue(count);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void skipValue() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ super.skipValue();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.hasNext();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ImmutableValue unpackValue() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ try {
+ return super.unpackValue();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
/**
* Reads an UUID.
*
* @return UUID value.
- * @throws IOException when underlying input throws IOException.
* @throws MessageTypeException when type is not UUID.
* @throws MessageSizeException when size is not correct.
*/
- public UUID unpackUuid() throws IOException {
+ public UUID unpackUuid() {
+ assert refCnt > 0 : "Unpacker is closed";
+
var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();
var len = hdr.getLength();
@@ -95,20 +341,24 @@ public class ClientMessageUnpacker extends MessageUnpacker {
* Reads a decimal.
*
* @return Decimal value.
- * @throws IOException when underlying input throws IOException.
+ * @throws UnsupportedOperationException Not supported yet.
*/
- public BigDecimal unpackDecimal() throws IOException {
- throw new IOException("TODO: IGNITE-15163");
+ public BigDecimal unpackDecimal() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ throw new UnsupportedOperationException("TODO: IGNITE-15163");
}
/**
* Reads a bit set.
*
* @return Bit set.
- * @throws IOException when underlying input throws IOException.
+ * @throws UnsupportedOperationException Not supported yet.
*/
- public BitSet unpackBitSet() throws IOException {
- throw new IOException("TODO: IGNITE-15163");
+ public BitSet unpackBitSet() {
+ assert refCnt > 0 : "Unpacker is closed";
+
+ throw new UnsupportedOperationException("TODO: IGNITE-15163");
}
/**
@@ -117,10 +367,9 @@ public class ClientMessageUnpacker extends MessageUnpacker {
* @param dataType Data type code.
*
* @return Unpacked object.
- * @throws IOException when underlying input throws IOException.
* @throws IgniteException when data type is not valid.
*/
- public Object unpackObject(int dataType) throws IOException {
+ public Object unpackObject(int dataType) {
if (tryUnpackNil())
return null;
@@ -164,12 +413,41 @@ public class ClientMessageUnpacker extends MessageUnpacker {
throw new IgniteException("Unknown client data type: " + dataType);
}
+ /**
+ * Creates a copy of this unpacker and the underlying buffer.
+ *
+ * @return Copied unpacker.
+ * @throws UncheckedIOException When buffer operation fails.
+ */
+ public ClientMessageUnpacker copy() {
+ try {
+ in.reset(new ByteBufInputStream(buf.copy()));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return this;
+ }
+
+ /**
+ * Increases the reference count by {@code 1}.
+ *
+ * @return This instance.
+ */
+ public ClientMessageUnpacker retain() {
+ refCnt++;
+
+ buf.retain();
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public void close() {
- if (closed)
+ if (refCnt == 0)
return;
- closed = true;
+ refCnt--;
if (buf.refCnt() > 0)
buf.release();
diff --git a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientOp.java b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientOp.java
index d7df3cd..00f7418 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientOp.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/client/proto/ClientOp.java
@@ -44,4 +44,64 @@ public class ClientOp {
/** Get tuple. */
public static final int TUPLE_GET = 12;
+
+ /** Upsert all tuples. */
+ public static final int TUPLE_UPSERT_ALL = 13;
+
+ /** Upsert all tuples without schema. */
+ public static final int TUPLE_UPSERT_ALL_SCHEMALESS = 14;
+
+ /** Get all tuples. */
+ public static final int TUPLE_GET_ALL = 15;
+
+ /** Get and upsert tuple. */
+ public static final int TUPLE_GET_AND_UPSERT = 16;
+
+ /** Get and upsert tuple without schema. */
+ public static final int TUPLE_GET_AND_UPSERT_SCHEMALESS = 17;
+
+ /** Insert tuple. */
+ public static final int TUPLE_INSERT = 18;
+
+ /** Insert tuple without schema. */
+ public static final int TUPLE_INSERT_SCHEMALESS = 19;
+
+ /** Insert all tuples. */
+ public static final int TUPLE_INSERT_ALL = 20;
+
+ /** Insert all tuples without schema. */
+ public static final int TUPLE_INSERT_ALL_SCHEMALESS = 21;
+
+ /** Replace tuple. */
+ public static final int TUPLE_REPLACE = 22;
+
+ /** Replace tuple without schema. */
+ public static final int TUPLE_REPLACE_SCHEMALESS = 23;
+
+ /** Replace exact tuple. */
+ public static final int TUPLE_REPLACE_EXACT = 24;
+
+ /** Replace exact tuple without schema. */
+ public static final int TUPLE_REPLACE_EXACT_SCHEMALESS = 25;
+
+ /** Get and replace tuple. */
+ public static final int TUPLE_GET_AND_REPLACE = 26;
+
+ /** Get and replace tuple without schema. */
+ public static final int TUPLE_GET_AND_REPLACE_SCHEMALESS = 27;
+
+ /** Delete tuple. */
+ public static final int TUPLE_DELETE = 28;
+
+ /** Delete all tuples. */
+ public static final int TUPLE_DELETE_ALL = 29;
+
+ /** Delete exact tuple. */
+ public static final int TUPLE_DELETE_EXACT = 30;
+
+ /** Delete all exact tuples. */
+ public static final int TUPLE_DELETE_ALL_EXACT = 31;
+
+ /** Get and delete tuple. */
+ public static final int TUPLE_GET_AND_DELETE = 32;
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index b5436b2..f6ccd1a 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -30,7 +30,27 @@ import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableDropRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTablesGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteAllExactRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteAllRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteExactRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleDeleteRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleGetAllRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndDeleteRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndReplaceRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndReplaceSchemalessRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndUpsertRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleGetAndUpsertSchemalessRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleGetRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleInsertAllRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleInsertAllSchemalessRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleInsertRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleInsertSchemalessRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceExactRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceExactSchemalessRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceSchemalessRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertAllRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertAllSchemalessRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertRequest;
import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertSchemalessRequest;
import org.apache.ignite.client.proto.ClientErrorCode;
@@ -218,7 +238,6 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
ClientMessagePacker out,
int opCode
) throws IOException {
- // TODO: Handle all operations asynchronously (add async table API).
switch (opCode) {
case ClientOp.TABLE_DROP:
return ClientTableDropRequest.process(in, ignite.tables());
@@ -241,6 +260,66 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
case ClientOp.TUPLE_GET:
return ClientTupleGetRequest.process(in, out, ignite.tables());
+ case ClientOp.TUPLE_UPSERT_ALL:
+ return ClientTupleUpsertAllRequest.process(in, ignite.tables());
+
+ case ClientOp.TUPLE_UPSERT_ALL_SCHEMALESS:
+ return ClientTupleUpsertAllSchemalessRequest.process(in, ignite.tables());
+
+ case ClientOp.TUPLE_GET_ALL:
+ return ClientTupleGetAllRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_GET_AND_UPSERT:
+ return ClientTupleGetAndUpsertRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_GET_AND_UPSERT_SCHEMALESS:
+ return ClientTupleGetAndUpsertSchemalessRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_INSERT:
+ return ClientTupleInsertRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_INSERT_SCHEMALESS:
+ return ClientTupleInsertSchemalessRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_INSERT_ALL:
+ return ClientTupleInsertAllRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_INSERT_ALL_SCHEMALESS:
+ return ClientTupleInsertAllSchemalessRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_REPLACE:
+ return ClientTupleReplaceRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_REPLACE_SCHEMALESS:
+ return ClientTupleReplaceSchemalessRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_REPLACE_EXACT:
+ return ClientTupleReplaceExactRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_REPLACE_EXACT_SCHEMALESS:
+ return ClientTupleReplaceExactSchemalessRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_GET_AND_REPLACE:
+ return ClientTupleGetAndReplaceRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_GET_AND_REPLACE_SCHEMALESS:
+ return ClientTupleGetAndReplaceSchemalessRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_DELETE:
+ return ClientTupleDeleteRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_DELETE_ALL:
+ return ClientTupleDeleteAllRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_DELETE_EXACT:
+ return ClientTupleDeleteExactRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_DELETE_ALL_EXACT:
+ return ClientTupleDeleteAllExactRequest.process(in, out, ignite.tables());
+
+ case ClientOp.TUPLE_GET_AND_DELETE:
+ return ClientTupleGetAndDeleteRequest.process(in, out, ignite.tables());
+
default:
throw new IgniteException("Unexpected operation code: " + opCode);
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientSchemasGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientSchemasGetRequest.java
index e89e68a..86cd6a8 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientSchemasGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientSchemasGetRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
@@ -39,14 +38,13 @@ public class ClientSchemasGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
* @throws IgniteException When schema registry is no initialized.
*/
public static CompletableFuture<Object> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
if (in.getNextFormat() == MessageFormat.NIL) {
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 74ef6c2..9cc3dc3 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -17,9 +17,10 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.math.BigDecimal;
+import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.client.proto.ClientDataType;
import org.apache.ignite.client.proto.ClientMessagePacker;
@@ -34,6 +35,7 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.TupleBuilder;
import org.apache.ignite.table.manager.IgniteTables;
+import org.jetbrains.annotations.NotNull;
import org.msgpack.core.MessageFormat;
/**
@@ -46,13 +48,8 @@ class ClientTableCommon {
* @param packer Packer.
* @param schemaVer Schema version.
* @param schema Schema.
- * @throws IOException When serialization fails.
*/
- public static void writeSchema(
- ClientMessagePacker packer,
- int schemaVer,
- SchemaDescriptor schema
- ) throws IOException {
+ public static void writeSchema(ClientMessagePacker packer,int schemaVer, SchemaDescriptor schema) {
packer.packInt(schemaVer);
if (schema == null) {
@@ -80,28 +77,126 @@ class ClientTableCommon {
*
* @param packer Packer.
* @param tuple Tuple.
- * @throws IgniteException on failed serialization.
*/
public static void writeTuple(ClientMessagePacker packer, Tuple tuple) {
- try {
- if (tuple == null) {
- packer.packNil();
+ if (tuple == null) {
+ packer.packNil();
- return;
- }
+ return;
+ }
- var schema = ((SchemaAware) tuple).schema();
+ var schema = ((SchemaAware) tuple).schema();
+ writeTuple(packer, tuple, schema);
+ }
+
+ /**
+ * Writes a tuple.
+ *
+ * @param packer Packer.
+ * @param tuple Tuple.
+ * @param schema Tuple schema.
+ * @throws IgniteException on failed serialization.
+ */
+ public static void writeTuple(
+ ClientMessagePacker packer,
+ Tuple tuple,
+ SchemaDescriptor schema
+ ) {
+ writeTuple(packer, tuple, schema, false, false);
+ }
+
+ /**
+ * Writes a tuple.
+ *
+ * @param packer Packer.
+ * @param tuple Tuple.
+ * @param schema Tuple schema.
+ * @param skipHeader Whether to skip the tuple header.
+ * @throws IgniteException on failed serialization.
+ */
+ public static void writeTuple(
+ ClientMessagePacker packer,
+ Tuple tuple,
+ SchemaDescriptor schema,
+ boolean skipHeader
+ ) {
+ writeTuple(packer, tuple, schema, skipHeader, false);
+ }
+
+ /**
+ * Writes a tuple.
+ *
+ * @param packer Packer.
+ * @param tuple Tuple.
+ * @param schema Tuple schema.
+ * @param skipHeader Whether to skip the tuple header.
+ * @param keyOnly Whether to write key fields only.
+ * @throws IgniteException on failed serialization.
+ */
+ public static void writeTuple(
+ ClientMessagePacker packer,
+ Tuple tuple,
+ SchemaDescriptor schema,
+ boolean skipHeader,
+ boolean keyOnly
+ ) {
+ if (tuple == null) {
+ packer.packNil();
+
+ return;
+ }
+
+ if (!skipHeader)
packer.packInt(schema.version());
- for (var col : schema.keyColumns().columns())
- writeColumnValue(packer, tuple, col);
+ for (var col : schema.keyColumns().columns())
+ writeColumnValue(packer, tuple, col);
+ if (!keyOnly) {
for (var col : schema.valueColumns().columns())
writeColumnValue(packer, tuple, col);
}
- catch (Throwable t) {
- throw new IgniteException("Failed to serialize tuple", t);
+ }
+
+ /**
+ * Writes multiple tuples.
+ *
+ * @param packer Packer.
+ * @param tuples Tuples.
+ * @throws IgniteException on failed serialization.
+ */
+ public static void writeTuples(ClientMessagePacker packer, Collection<Tuple> tuples) {
+ writeTuples(packer, tuples, false);
+ }
+
+ /**
+ * Writes multiple tuples.
+ *
+ * @param packer Packer.
+ * @param tuples Tuples.
+ * @param keyOnly Whether to write key fields only.
+ * @throws IgniteException on failed serialization.
+ */
+ public static void writeTuples(ClientMessagePacker packer, Collection<Tuple> tuples, boolean keyOnly) {
+ if (tuples == null || tuples.isEmpty()) {
+ packer.packNil();
+
+ return;
+ }
+
+ SchemaDescriptor schema = null;
+
+ for (Tuple tuple : tuples) {
+ if (schema == null) {
+ schema = ((SchemaAware) tuple).schema();
+
+ packer.packInt(schema.version());
+ packer.packInt(tuples.size());
+ } else
+ assert schema.version() == ((SchemaAware) tuple).schema().version();
+
+ writeTuple(packer, tuple, schema, true, keyOnly);
}
}
@@ -112,11 +207,61 @@ class ClientTableCommon {
* @param table Table.
* @param keyOnly Whether only key fields are expected.
* @return Tuple.
- * @throws IOException When deserialization fails.
*/
- public static Tuple readTuple(ClientMessageUnpacker unpacker, TableImpl table, boolean keyOnly) throws IOException {
+ public static Tuple readTuple(ClientMessageUnpacker unpacker, TableImpl table, boolean keyOnly) {
+ SchemaDescriptor schema = readSchema(unpacker, table);
+
+ return readTuple(unpacker, table, keyOnly, schema);
+ }
+
+ /**
+ * Reads multiple tuples.
+ *
+ * @param unpacker Unpacker.
+ * @param table Table.
+ * @param keyOnly Whether only key fields are expected.
+ * @return Tuples.
+ */
+ public static ArrayList<Tuple> readTuples(ClientMessageUnpacker unpacker, TableImpl table, boolean keyOnly) {
+ SchemaDescriptor schema = readSchema(unpacker, table);
+
+ var rowCnt = unpacker.unpackInt();
+ var res = new ArrayList<Tuple>(rowCnt);
+
+ for (int i = 0; i < rowCnt; i++)
+ res.add(readTuple(unpacker, table, keyOnly, schema));
+
+ return res;
+ }
+
+ /**
+ * Reads schema.
+ *
+ * @param unpacker Unpacker.
+ * @param table Table.
+ * @return Schema descriptor.
+ */
+ @NotNull public static SchemaDescriptor readSchema(ClientMessageUnpacker unpacker, TableImpl table) {
var schemaId = unpacker.unpackInt();
- var schema = table.schemaView().schema(schemaId);
+
+ return table.schemaView().schema(schemaId);
+ }
+
+ /**
+ * Reads a tuple.
+ *
+ * @param unpacker Unpacker.
+ * @param table Table.
+ * @param keyOnly Whether only key fields are expected.
+ * @param schema Tuple schema.
+ * @return Tuple.
+ */
+ public static Tuple readTuple(
+ ClientMessageUnpacker unpacker,
+ TableImpl table,
+ boolean keyOnly,
+ SchemaDescriptor schema
+ ) {
var builder = table.tupleBuilder();
var cnt = keyOnly ? schema.keyColumns().length() : schema.length();
@@ -139,15 +284,15 @@ class ClientTableCommon {
* @param unpacker Unpacker.
* @param table Table.
* @return Tuple.
- * @throws IOException When deserialization fails.
*/
- public static Tuple readTupleSchemaless(ClientMessageUnpacker unpacker, TableImpl table) throws IOException {
+ public static Tuple readTupleSchemaless(ClientMessageUnpacker unpacker, TableImpl table) {
var cnt = unpacker.unpackMapHeader();
var builder = table.tupleBuilder();
for (int i = 0; i < cnt; i++) {
var colName = unpacker.unpackString();
+ // TODO: Unpack value as object IGNITE-15194.
builder.set(colName, unpacker.unpackValue());
}
@@ -155,21 +300,36 @@ class ClientTableCommon {
}
/**
+ * Reads multiple tuples as a map, without schema.
+ *
+ * @param unpacker Unpacker.
+ * @param table Table.
+ * @return Tuples.
+ */
+ public static ArrayList<Tuple> readTuplesSchemaless(ClientMessageUnpacker unpacker, TableImpl table) {
+ var rowCnt = unpacker.unpackArrayHeader();
+ var res = new ArrayList<Tuple>(rowCnt);
+
+ for (int i = 0; i < rowCnt; i++)
+ res.add(readTupleSchemaless(unpacker, table));
+
+ return res;
+ }
+
+ /**
* Reads a table.
*
* @param unpacker Unpacker.
* @param tables Ignite tables.
* @return Table.
- * @throws IOException When deserialization fails.
*/
- public static TableImpl readTable(ClientMessageUnpacker unpacker, IgniteTables tables) throws IOException {
+ public static TableImpl readTable(ClientMessageUnpacker unpacker, IgniteTables tables) {
var tableId = unpacker.unpackUuid();
return ((IgniteTablesInternal)tables).table(tableId);
}
- private static void readAndSetColumnValue(ClientMessageUnpacker unpacker, TupleBuilder builder, Column col)
- throws IOException {
+ private static void readAndSetColumnValue(ClientMessageUnpacker unpacker, TupleBuilder builder, Column col) {
builder.set(col.name(), unpacker.unpackObject(getClientDataType(col.type().spec())));
}
@@ -212,8 +372,8 @@ class ClientTableCommon {
throw new IgniteException("Unsupported native type: " + spec);
}
- private static void writeColumnValue(ClientMessagePacker packer, Tuple tuple, Column col) throws IOException {
- var val = tuple.value(col.name());
+ private static void writeColumnValue(ClientMessagePacker packer, Tuple tuple, Column col) {
+ var val = tuple.valueOrDefault(col.name(), null);
if (val == null) {
packer.packNil();
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
index 5304ae3..effa1f8 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableDropRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
@@ -32,12 +31,8 @@ public class ClientTableDropRequest {
* @param in Unpacker.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
- public static CompletableFuture<Void> process(
- ClientMessageUnpacker in,
- IgniteTables tables
- ) throws IOException {
+ public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) {
var tableName = in.unpackString();
return tables.dropTableAsync(tableName);
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
index da39738..eaaa7c2 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableGetRequest.java
@@ -17,9 +17,7 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.table.TableImpl;
@@ -36,24 +34,19 @@ public class ClientTableGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
String tableName = in.unpackString();
return tables.tableAsync(tableName).thenAccept(table -> {
- try {
- if (table == null)
- out.packNil();
- else
- out.packUuid(((TableImpl) table).tableId());
- } catch (IOException e) {
- throw new CompletionException(e);
- }
+ if (table == null)
+ out.packNil();
+ else
+ out.packUuid(((TableImpl) table).tableId());
});
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
index a7000b0..ffb3737 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTablesGetRequest.java
@@ -17,9 +17,7 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.table.manager.IgniteTables;
@@ -40,17 +38,13 @@ public class ClientTablesGetRequest {
IgniteTables igniteTables
) {
return igniteTables.tablesAsync().thenAccept(tables -> {
- try {
- out.packMapHeader(tables.size());
+ out.packMapHeader(tables.size());
- for (var table : tables) {
- var tableImpl = (TableImpl) table;
+ for (var table : tables) {
+ var tableImpl = (TableImpl) table;
- out.packUuid(tableImpl.tableId());
- out.packString(table.tableName());
- }
- } catch (IOException e) {
- throw new CompletionException(e);
+ out.packUuid(tableImpl.tableId());
+ out.packString(table.tableName());
}
});
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
similarity index 82%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
index bc3bce0..0c16f58 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
@@ -17,20 +17,19 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuples;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples;
/**
- * Client tuple get request.
+ * Client tuple delete all exact request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleDeleteAllExactRequest {
/**
* Processes the request.
*
@@ -38,16 +37,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuples = readTuples(in, table, false);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.deleteAllExactAsync(tuples).thenAccept(skippedTuples -> writeTuples(out, skippedTuples));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
similarity index 82%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
index bc3bce0..332f90c 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
@@ -17,20 +17,19 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuples;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples;
/**
- * Client tuple get request.
+ * Client tuple delete all request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleDeleteAllRequest {
/**
* Processes the request.
*
@@ -38,16 +37,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuples = readTuples(in, table, true);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.deleteAllAsync(tuples).thenAccept(skippedTuples -> writeTuples(out, skippedTuples, true));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
similarity index 80%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
index bc3bce0..73c34e5 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
@@ -25,12 +24,11 @@ import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
/**
- * Client tuple get request.
+ * Client tuple delete exact request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleDeleteExactRequest {
/**
* Processes the request.
*
@@ -38,16 +36,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuple = readTuple(in, table, false);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.deleteExactAsync(tuple).thenAccept(out::packBoolean);
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
similarity index 80%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
index bc3bce0..9146a71 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
@@ -25,12 +24,11 @@ import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
/**
- * Client tuple get request.
+ * Client tuple delete request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleDeleteRequest {
/**
* Processes the request.
*
@@ -38,16 +36,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuple = readTuple(in, table, true);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.deleteAsync(tuple).thenAccept(out::packBoolean);
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
similarity index 82%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
index bc3bce0..088ca27 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
@@ -17,20 +17,19 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuples;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples;
/**
- * Client tuple get request.
+ * Client tuple get all request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleGetAllRequest {
/**
* Processes the request.
*
@@ -38,16 +37,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var keyTuples = readTuples(in, table, true);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.getAllAsync(keyTuples).thenAccept(tuples -> writeTuples(out, tuples));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
similarity index 85%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
index bc3bce0..0240eb8 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
@@ -28,9 +27,9 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
/**
- * Client tuple get request.
+ * Client tuple get and delete request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleGetAndDeleteRequest {
/**
* Processes the request.
*
@@ -38,16 +37,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuple = readTuple(in, table, true);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.getAndDeleteAsync(tuple).thenAccept(resTuple -> writeTuple(out, resTuple));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
similarity index 85%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
index bc3bce0..c01376e 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
@@ -28,9 +27,9 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
/**
- * Client tuple get request.
+ * Client tuple get and replace request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleGetAndReplaceRequest {
/**
* Processes the request.
*
@@ -38,16 +37,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuple = readTuple(in, table, false);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.getAndReplaceAsync(tuple).thenAccept(resTuple -> writeTuple(out, resTuple));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceSchemalessRequest.java
similarity index 83%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceSchemalessRequest.java
index bc3bce0..2dd52bc 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceSchemalessRequest.java
@@ -17,20 +17,19 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTupleSchemaless;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
/**
- * Client tuple get request.
+ * Client tuple get and replace schemaless request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleGetAndReplaceSchemalessRequest {
/**
* Processes the request.
*
@@ -38,16 +37,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuple = readTupleSchemaless(in, table);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.getAndReplaceAsync(tuple).thenAccept(resTuple -> writeTuple(out, resTuple));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
similarity index 85%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
index bc3bce0..92eb104 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
@@ -28,9 +27,9 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
/**
- * Client tuple get request.
+ * Client tuple get and upsert request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleGetAndUpsertRequest {
/**
* Processes the request.
*
@@ -38,16 +37,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuple = readTuple(in, table, false);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.getAndUpsertAsync(tuple).thenAccept(resTuple -> writeTuple(out, resTuple));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertSchemalessRequest.java
similarity index 83%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertSchemalessRequest.java
index bc3bce0..313294c 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertSchemalessRequest.java
@@ -17,20 +17,19 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTupleSchemaless;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
/**
- * Client tuple get request.
+ * Client tuple get and upsert request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleGetAndUpsertSchemalessRequest {
/**
* Processes the request.
*
@@ -38,16 +37,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuple = readTupleSchemaless(in, table);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.getAndUpsertAsync(tuple).thenAccept(resTuple -> writeTuple(out, resTuple));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
index bc3bce0..417831f 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
@@ -38,13 +37,12 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
var keyTuple = readTuple(in, table, true);
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
similarity index 82%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
index bc3bce0..e6329b9 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
@@ -17,20 +17,19 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuples;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples;
/**
- * Client tuple get request.
+ * Client tuple insert all request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleInsertAllRequest {
/**
* Processes the request.
*
@@ -38,16 +37,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuples = readTuples(in, table, false);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.insertAllAsync(tuples).thenAccept(skippedTuples -> writeTuples(out, skippedTuples));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllSchemalessRequest.java
similarity index 82%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllSchemalessRequest.java
index bc3bce0..dbcc950 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllSchemalessRequest.java
@@ -17,20 +17,19 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuplesSchemaless;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples;
/**
- * Client tuple get request.
+ * Client tuple insert all schemaless request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleInsertAllSchemalessRequest {
/**
* Processes the request.
*
@@ -38,16 +37,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuples = readTuplesSchemaless(in, table);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.insertAllAsync(tuples).thenAccept(skippedTuples -> writeTuples(out, skippedTuples));
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
similarity index 80%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
index bc3bce0..57d4d2b 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
@@ -25,12 +24,11 @@ import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
/**
- * Client tuple get request.
+ * Client tuple insert request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleInsertRequest {
/**
* Processes the request.
*
@@ -38,16 +36,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuple = readTuple(in, table, false);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.insertAsync(tuple).thenAccept(out::packBoolean);
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertSchemalessRequest.java
similarity index 79%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertSchemalessRequest.java
index bc3bce0..351262b 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertSchemalessRequest.java
@@ -17,20 +17,18 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTupleSchemaless;
/**
- * Client tuple get request.
+ * Client tuple insert schemaless request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleInsertSchemalessRequest {
/**
* Processes the request.
*
@@ -38,16 +36,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuple = readTupleSchemaless(in, table);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.insertAsync(tuple).thenAccept(out::packBoolean);
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
similarity index 82%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
index bc3bce0..5b312af 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
@@ -17,20 +17,19 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readSchema;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
/**
- * Client tuple get request.
+ * Client tuple replace request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleReplaceExactRequest {
/**
* Processes the request.
*
@@ -38,16 +37,18 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var schema = readSchema(in, table);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ var oldTuple = readTuple(in, table, false, schema);
+ var newTuple = readTuple(in, table, false, schema);
+
+ return table.replaceAsync(oldTuple, newTuple).thenAccept(out::packBoolean);
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactSchemalessRequest.java
similarity index 79%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactSchemalessRequest.java
index bc3bce0..8c4bbb4 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactSchemalessRequest.java
@@ -17,20 +17,18 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTupleSchemaless;
/**
- * Client tuple get request.
+ * Client tuple replace request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleReplaceExactSchemalessRequest {
/**
* Processes the request.
*
@@ -38,16 +36,17 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ var oldTuple = readTupleSchemaless(in, table);
+ var newTuple = readTupleSchemaless(in, table);
+
+ return table.replaceAsync(oldTuple, newTuple).thenAccept(out::packBoolean);
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
similarity index 80%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
index bc3bce0..9e4da3e 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
@@ -25,12 +24,11 @@ import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
/**
- * Client tuple get request.
+ * Client tuple replace request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleReplaceRequest {
/**
* Processes the request.
*
@@ -38,16 +36,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuple = readTuple(in, table, false);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.replaceAsync(tuple).thenAccept(out::packBoolean);
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceSchemalessRequest.java
similarity index 79%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceSchemalessRequest.java
index bc3bce0..054f020 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceSchemalessRequest.java
@@ -17,20 +17,18 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTupleSchemaless;
/**
- * Client tuple get request.
+ * Client tuple replace schemaless request.
*/
-public class ClientTupleGetRequest {
+public class ClientTupleReplaceSchemalessRequest {
/**
* Processes the request.
*
@@ -38,16 +36,15 @@ public class ClientTupleGetRequest {
* @param out Packer.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
public static CompletableFuture<Void> process(
ClientMessageUnpacker in,
ClientMessagePacker out,
IgniteTables tables
- ) throws IOException {
+ ) {
var table = readTable(in, tables);
- var keyTuple = readTuple(in, table, true);
+ var tuple = readTupleSchemaless(in, table);
- return table.getAsync(keyTuple).thenAccept(t -> writeTuple(out, t));
+ return table.replaceAsync(tuple).thenAccept(out::packBoolean);
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
similarity index 81%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
index 8b98ea7..9a18adb 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
@@ -17,30 +17,28 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuples;
/**
- * Client tuple upsert request.
+ * Client tuple upsert all request.
*/
-public class ClientTupleUpsertRequest {
+public class ClientTupleUpsertAllRequest {
/**
* Processes the request.
*
* @param in Unpacker.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
- public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) throws IOException {
+ public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) {
var table = readTable(in, tables);
- var tuple = readTuple(in, table, false);
+ var tuples = readTuples(in, table, false);
- return table.upsertAsync(tuple);
+ return table.upsertAllAsync(tuples);
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllSchemalessRequest.java
similarity index 81%
copy from modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
copy to modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllSchemalessRequest.java
index 8b98ea7..23ba6b7 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllSchemalessRequest.java
@@ -17,30 +17,28 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTable;
-import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
+import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuplesSchemaless;
/**
- * Client tuple upsert request.
+ * Client tuple upsert all without schema request.
*/
-public class ClientTupleUpsertRequest {
+public class ClientTupleUpsertAllSchemalessRequest {
/**
* Processes the request.
*
* @param in Unpacker.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
- public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) throws IOException {
+ public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) {
var table = readTable(in, tables);
- var tuple = readTuple(in, table, false);
+ var tuples = readTuplesSchemaless(in, table);
- return table.upsertAsync(tuple);
+ return table.upsertAllAsync(tuples);
}
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
index 8b98ea7..65cee08 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
@@ -35,9 +34,8 @@ public class ClientTupleUpsertRequest {
* @param in Unpacker.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
- public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) throws IOException {
+ public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) {
var table = readTable(in, tables);
var tuple = readTuple(in, table, false);
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertSchemalessRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertSchemalessRequest.java
index 2a0bbcc..f86b229 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertSchemalessRequest.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertSchemalessRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler.requests.table;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.table.manager.IgniteTables;
@@ -26,7 +25,7 @@ import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTupleSchemaless;
/**
- * Client tuple upsert request.
+ * Client tuple upsert schemaless request.
*/
public class ClientTupleUpsertSchemalessRequest {
/**
@@ -35,9 +34,8 @@ public class ClientTupleUpsertSchemalessRequest {
* @param in Unpacker.
* @param tables Ignite tables.
* @return Future.
- * @throws IOException On serialization error.
*/
- public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) throws IOException {
+ public static CompletableFuture<Void> process(ClientMessageUnpacker in, IgniteTables tables) {
var table = readTable(in, tables);
var tuple = readTupleSchemaless(in, table);
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index cb535fb..62835c2 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.client;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
@@ -106,7 +105,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
}
/** {@inheritDoc} */
- @Override public void onMessage(ByteBuf buf) throws IOException {
+ @Override public void onMessage(ByteBuf buf) {
processNextMessage(buf);
}
@@ -230,7 +229,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/**
* Process next message from the input stream and complete corresponding future.
*/
- private void processNextMessage(ByteBuf buf) throws IgniteClientException, IOException {
+ private void processNextMessage(ByteBuf buf) throws IgniteClientException {
var unpacker = new ClientMessageUnpacker(buf);
if (protocolCtx == null) {
@@ -299,7 +298,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
}
/** Send handshake request. */
- private void handshakeReq(ProtocolVersion proposedVer) throws IOException {
+ private void handshakeReq(ProtocolVersion proposedVer) {
sock.send(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
var req = new ClientMessagePacker(sock.getBuffer());
@@ -360,8 +359,6 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
unpacker.skipValue(extensionsLen);
protocolCtx = protocolContextFromVersion(srvVer);
- } catch (IOException e) {
- throw handleIOError(e);
}
}
@@ -373,21 +370,6 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
}
/**
- * @param ex IO exception (cause).
- */
- private IgniteClientException handleIOError(@Nullable IOException ex) {
- return handleIOError("sock=" + sock, ex);
- }
-
- /**
- * @param chInfo Additional channel info
- * @param ex IO exception (cause).
- */
- private IgniteClientException handleIOError(String chInfo, @Nullable IOException ex) {
- return new IgniteClientConnectionException("Ignite cluster is unavailable [" + chInfo + ']', ex);
- }
-
- /**
*
*/
private static class ClientRequestFuture extends CompletableFuture<ClientMessageUnpacker> {
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
index dc93455..f2f1770 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientMessageHandler.java
@@ -30,5 +30,5 @@ public interface ClientMessageHandler {
* @param buf Buffer.
* @throws IOException on failure.
*/
- void onMessage(ByteBuf buf) throws IOException;
+ void onMessage(ByteBuf buf);
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 071f26b..dd7b67d 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -17,20 +17,23 @@
package org.apache.ignite.internal.client.table;
-import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
-
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
import org.apache.ignite.client.IgniteClientException;
+import org.apache.ignite.client.proto.ClientMessagePacker;
import org.apache.ignite.client.proto.ClientMessageUnpacker;
import org.apache.ignite.client.proto.ClientOp;
-import org.apache.ignite.internal.client.PayloadInputChannel;
-import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -119,6 +122,7 @@ public class ClientTable implements Table {
/** {@inheritDoc} */
@Override public Table withTransaction(Transaction tx) {
+ // TODO: Transactions IGNITE-15240
throw new UnsupportedOperationException();
}
@@ -134,33 +138,28 @@ public class ClientTable implements Table {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAsync(@NotNull Tuple keyRec) {
- return getLatestSchema().thenCompose(schema ->
- ch.serviceAsync(ClientOp.TUPLE_GET, w -> writeTuple(keyRec, schema, w, true), r -> {
- if (r.in().getNextFormat() == MessageFormat.NIL)
- return null;
-
- var schemaVer = r.in().unpackInt();
-
- return new IgniteBiTuple<>(r, schemaVer);
- })).thenCompose(biTuple -> {
- if (biTuple == null)
- return CompletableFuture.completedFuture(null);
+ Objects.requireNonNull(keyRec);
- assert biTuple.getKey() != null;
- assert biTuple.getValue() != null;
-
- return getSchema(biTuple.getValue()).thenApply(schema -> readTuple(schema, biTuple.getKey()));
- });
+ return doSchemaOutInOpAsync(
+ ClientOp.TUPLE_GET,
+ (schema, out) -> writeTuple(keyRec, schema, out, true),
+ this::readTuple);
}
/** {@inheritDoc} */
@Override public Collection<Tuple> getAll(@NotNull Collection<Tuple> keyRecs) {
- throw new UnsupportedOperationException();
+ return getAllAsync(keyRecs).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(@NotNull Collection<Tuple> keyRecs) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(keyRecs);
+
+ return doSchemaOutInOpAsync(
+ ClientOp.TUPLE_GET_ALL,
+ (s, w) -> writeTuples(keyRecs, s, w, true),
+ this::readTuples,
+ Collections.emptyList());
}
/** {@inheritDoc} */
@@ -170,128 +169,201 @@ public class ClientTable implements Table {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Void> upsertAsync(@NotNull Tuple rec) {
- return getLatestSchema().thenCompose(schema -> ch.serviceAsync(ClientOp.TUPLE_UPSERT,
- w -> writeTuple(rec, schema, w, false), r -> null));
+ Objects.requireNonNull(rec);
+
+ // TODO IGNITE-15194: Convert Tuple to a schema-order Array as a first step.
+ // If it does not match the latest schema, then request latest and convert again.
+ return doSchemaOutOpAsync(
+ ClientOp.TUPLE_UPSERT,
+ (s, w) -> writeTuple(rec, s, w),
+ r -> null);
}
/** {@inheritDoc} */
@Override public void upsertAll(@NotNull Collection<Tuple> recs) {
- throw new UnsupportedOperationException();
+ upsertAllAsync(recs).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Void> upsertAllAsync(@NotNull Collection<Tuple> recs) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(recs);
+
+ return doSchemaOutOpAsync(
+ ClientOp.TUPLE_UPSERT_ALL,
+ (s, w) -> writeTuples(recs, s, w, false),
+ r -> null);
}
/** {@inheritDoc} */
@Override public Tuple getAndUpsert(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ return getAndUpsertAsync(rec).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAndUpsertAsync(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(rec);
+
+ return doSchemaOutInOpAsync(
+ ClientOp.TUPLE_GET_AND_UPSERT,
+ (s, w) -> writeTuple(rec, s, w, false),
+ this::readTuple);
}
/** {@inheritDoc} */
@Override public boolean insert(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ return insertAsync(rec).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> insertAsync(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(rec);
+
+ return doSchemaOutOpAsync(
+ ClientOp.TUPLE_INSERT,
+ (s, w) -> writeTuple(rec, s, w, false),
+ ClientMessageUnpacker::unpackBoolean);
}
/** {@inheritDoc} */
@Override public Collection<Tuple> insertAll(@NotNull Collection<Tuple> recs) {
- throw new UnsupportedOperationException();
+ return insertAllAsync(recs).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<Tuple>> insertAllAsync(@NotNull Collection<Tuple> recs) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(recs);
+
+ return doSchemaOutInOpAsync(
+ ClientOp.TUPLE_INSERT_ALL,
+ (s, w) -> writeTuples(recs, s, w, false),
+ this::readTuples,
+ Collections.emptyList());
}
/** {@inheritDoc} */
@Override public boolean replace(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ return replaceAsync(rec).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(rec);
+
+ return doSchemaOutOpAsync(
+ ClientOp.TUPLE_REPLACE,
+ (s, w) -> writeTuple(rec, s, w, false),
+ ClientMessageUnpacker::unpackBoolean);
}
/** {@inheritDoc} */
@Override public boolean replace(@NotNull Tuple oldRec, @NotNull Tuple newRec) {
- throw new UnsupportedOperationException();
+ return replaceAsync(oldRec, newRec).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple oldRec, @NotNull Tuple newRec) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(oldRec);
+ Objects.requireNonNull(newRec);
+
+ return doSchemaOutOpAsync(
+ ClientOp.TUPLE_REPLACE_EXACT,
+ (s, w) -> {
+ writeTuple(oldRec, s, w, false, false);
+ writeTuple(newRec, s, w, false, true);
+ },
+ ClientMessageUnpacker::unpackBoolean);
}
/** {@inheritDoc} */
@Override public Tuple getAndReplace(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ return getAndReplaceAsync(rec).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(rec);
+
+ return doSchemaOutInOpAsync(
+ ClientOp.TUPLE_GET_AND_REPLACE,
+ (s, w) -> writeTuple(rec, s, w, false),
+ this::readTuple);
}
/** {@inheritDoc} */
@Override public boolean delete(@NotNull Tuple keyRec) {
- throw new UnsupportedOperationException();
+ return deleteAsync(keyRec).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> deleteAsync(@NotNull Tuple keyRec) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(keyRec);
+
+ return doSchemaOutOpAsync(
+ ClientOp.TUPLE_DELETE,
+ (s, w) -> writeTuple(keyRec, s, w, true),
+ ClientMessageUnpacker::unpackBoolean);
}
/** {@inheritDoc} */
@Override public boolean deleteExact(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ return deleteExactAsync(rec).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(rec);
+
+ return doSchemaOutOpAsync(
+ ClientOp.TUPLE_DELETE_EXACT,
+ (s, w) -> writeTuple(rec, s, w, false),
+ ClientMessageUnpacker::unpackBoolean);
}
/** {@inheritDoc} */
@Override public Tuple getAndDelete(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ return getAndDeleteAsync(rec).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Tuple> getAndDeleteAsync(@NotNull Tuple rec) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(rec);
+
+ return doSchemaOutInOpAsync(
+ ClientOp.TUPLE_GET_AND_DELETE,
+ (s, w) -> writeTuple(rec, s, w, false),
+ this::readTuple);
}
/** {@inheritDoc} */
@Override public Collection<Tuple> deleteAll(@NotNull Collection<Tuple> recs) {
- throw new UnsupportedOperationException();
+ return deleteAllAsync(recs).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllAsync(@NotNull Collection<Tuple> recs) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(recs);
+
+ return doSchemaOutInOpAsync(
+ ClientOp.TUPLE_DELETE_ALL,
+ (s, w) -> writeTuples(recs, s, w, true),
+ (schema, in) -> readTuples(schema, in, true),
+ Collections.emptyList());
}
/** {@inheritDoc} */
@Override public Collection<Tuple> deleteAllExact(@NotNull Collection<Tuple> recs) {
- throw new UnsupportedOperationException();
+ return deleteAllExactAsync(recs).join();
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllExactAsync(@NotNull Collection<Tuple> recs) {
- throw new UnsupportedOperationException();
+ Objects.requireNonNull(recs);
+
+ return doSchemaOutInOpAsync(
+ ClientOp.TUPLE_DELETE_ALL_EXACT,
+ (s, w) -> writeTuples(recs, s, w, false),
+ this::readTuples,
+ Collections.emptyList());
}
/** {@inheritDoc} */
@@ -316,6 +388,7 @@ public class ClientTable implements Table {
/** {@inheritDoc} */
@Override public @Nullable Transaction transaction() {
+ // TODO: Transactions IGNITE-15240
throw new UnsupportedOperationException();
}
@@ -360,7 +433,7 @@ public class ClientTable implements Table {
});
}
- private ClientSchema readSchema(ClientMessageUnpacker in) throws IOException {
+ private ClientSchema readSchema(ClientMessageUnpacker in) {
var schemaVer = in.unpackInt();
var colCnt = in.unpackArrayHeader();
@@ -401,7 +474,31 @@ public class ClientTable implements Table {
return IgniteToStringBuilder.toString(ClientTable.class, this);
}
- private void writeTuple(@NotNull Tuple tuple, ClientSchema schema, PayloadOutputChannel w, boolean keyOnly) throws IOException {
+ private void writeTuple(
+ @NotNull Tuple tuple,
+ ClientSchema schema,
+ ClientMessagePacker out
+ ) {
+ writeTuple(tuple, schema, out, false, false);
+ }
+
+ private void writeTuple(
+ @NotNull Tuple tuple,
+ ClientSchema schema,
+ ClientMessagePacker out,
+ boolean keyOnly
+ ) {
+ writeTuple(tuple, schema, out, keyOnly, false);
+ }
+
+ private void writeTuple(
+ @NotNull Tuple tuple,
+ ClientSchema schema,
+ ClientMessagePacker out,
+ boolean keyOnly,
+ boolean skipHeader
+ ) {
+ // TODO: Special case for ClientTupleBuilder - it has columns in order
var vals = new Object[keyOnly ? schema.keyColumnCount() : schema.columns().length];
var tupleSize = tuple.columnCount();
@@ -415,23 +512,135 @@ public class ClientTable implements Table {
vals[col.schemaIndex()] = tuple.value(i);
}
- w.out().packUuid(id);
- w.out().packInt(schema.version());
+ if (!skipHeader) {
+ out.packUuid(id);
+ out.packInt(schema.version());
+ }
for (var val : vals)
- w.out().packObject(val);
+ out.packObject(val);
}
- private Tuple readTuple(ClientSchema schema, PayloadInputChannel r) {
+ private void writeTuples(
+ @NotNull Collection<Tuple> tuples,
+ ClientSchema schema,
+ ClientMessagePacker out,
+ boolean keyOnly
+ ) {
+ out.packUuid(id);
+ out.packInt(schema.version());
+ out.packInt(tuples.size());
+
+ for (var tuple : tuples)
+ writeTuple(tuple, schema, out, keyOnly, true);
+ }
+
+ private Tuple readTuple(ClientSchema schema, ClientMessageUnpacker in) {
+ return readTuple(schema, in, false);
+ }
+
+ private Tuple readTuple(ClientSchema schema, ClientMessageUnpacker in, boolean keyOnly) {
var builder = new ClientTupleBuilder(schema);
- try {
- for (var col : schema.columns())
- builder.setInternal(col.schemaIndex(), r.in().unpackObject(col.type()));
- } catch (IOException e) {
- throw new CompletionException(e);
- }
+ var colCnt = keyOnly ? schema.keyColumnCount() : schema.columns().length;
+
+ for (var i = 0; i < colCnt; i++)
+ builder.setInternal(i, in.unpackObject(schema.columns()[i].type()));
return builder;
}
+
+ private Collection<Tuple> readTuples(ClientSchema schema, ClientMessageUnpacker in) {
+ return readTuples(schema, in, false);
+ }
+
+ private Collection<Tuple> readTuples(ClientSchema schema, ClientMessageUnpacker in, boolean keyOnly) {
+ var cnt = in.unpackInt();
+ var res = new ArrayList<Tuple>(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ res.add(readTuple(schema, in, keyOnly));
+
+ return res;
+ }
+
+ private <T> CompletableFuture<T> doSchemaOutInOpAsync(
+ int opCode,
+ BiConsumer<ClientSchema, ClientMessagePacker> writer,
+ BiFunction<ClientSchema, ClientMessageUnpacker, T> reader
+ ) {
+ return doSchemaOutInOpAsync(opCode, writer, reader, null);
+ }
+
+ private <T> CompletableFuture<T> doSchemaOutInOpAsync(
+ int opCode,
+ BiConsumer<ClientSchema, ClientMessagePacker> writer,
+ BiFunction<ClientSchema, ClientMessageUnpacker, T> reader,
+ T defaultValue
+ ) {
+ return getLatestSchema()
+ .thenCompose(schema ->
+ ch.serviceAsync(opCode,
+ w -> writer.accept(schema, w.out()),
+ r -> readSchemaAndReadData(schema, r.in(), reader, defaultValue)))
+ .thenCompose(t -> loadSchemaAndReadData(t, reader));
+ }
+
+ private <T> CompletableFuture<T> doSchemaOutOpAsync(
+ int opCode,
+ BiConsumer<ClientSchema, ClientMessagePacker> writer,
+ Function<ClientMessageUnpacker, T> reader) {
+ return getLatestSchema()
+ .thenCompose(schema ->
+ ch.serviceAsync(opCode,
+ w -> writer.accept(schema, w.out()),
+ r -> reader.apply(r.in())));
+ }
+
+ private <T> Object readSchemaAndReadData(
+ ClientSchema knownSchema,
+ ClientMessageUnpacker in,
+ BiFunction<ClientSchema, ClientMessageUnpacker, T> fn,
+ T defaultValue
+ ) {
+ if (in.getNextFormat() == MessageFormat.NIL)
+ return defaultValue;
+
+ var schemaVer = in.unpackInt();
+
+ var resSchema = schemaVer == knownSchema.version() ? knownSchema : schemas.get(schemaVer);
+
+ if (resSchema != null)
+ return fn.apply(knownSchema, in);
+
+ // Schema is not yet known - request.
+ // Retain unpacker - normally it is closed when this method exits.
+ return new IgniteBiTuple<>(in.retain(), schemaVer);
+ }
+
+ private <T> CompletionStage<T> loadSchemaAndReadData(
+ Object data,
+ BiFunction<ClientSchema, ClientMessageUnpacker, T> fn
+ ) {
+ if (!(data instanceof IgniteBiTuple))
+ return CompletableFuture.completedFuture((T) data);
+
+ var biTuple = (IgniteBiTuple<ClientMessageUnpacker, Integer>) data;
+
+ var in = biTuple.getKey();
+ var schemaId = biTuple.getValue();
+
+ assert in != null;
+ assert schemaId != null;
+
+ var resFut = getSchema(schemaId).thenApply(schema -> fn.apply(schema, in));
+
+ // Close unpacker.
+ resFut.handle((tuple, err) -> {
+ in.close();
+ return null;
+ });
+
+ return resFut;
+ }
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
index 79965b0..b401ab4 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client.table;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.ignite.client.proto.ClientOp;
@@ -50,6 +51,9 @@ public class ClientTables implements IgniteTables {
/** {@inheritDoc} */
@Override public CompletableFuture<Table> createTableAsync(String name, Consumer<TableChange> tableInitChange) {
+ Objects.requireNonNull(name);
+ Objects.requireNonNull(tableInitChange);
+
throw new UnsupportedOperationException();
}
@@ -60,6 +64,9 @@ public class ClientTables implements IgniteTables {
/** {@inheritDoc} */
@Override public CompletableFuture<Void> alterTableAsync(String name, Consumer<TableChange> tableChange) {
+ Objects.requireNonNull(name);
+ Objects.requireNonNull(tableChange);
+
throw new UnsupportedOperationException();
}
@@ -70,6 +77,9 @@ public class ClientTables implements IgniteTables {
/** {@inheritDoc} */
@Override public CompletableFuture<Table> getOrCreateTableAsync(String name, Consumer<TableChange> tableInitChange) {
+ Objects.requireNonNull(name);
+ Objects.requireNonNull(tableInitChange);
+
throw new UnsupportedOperationException();
}
@@ -80,6 +90,8 @@ public class ClientTables implements IgniteTables {
/** {@inheritDoc} */
@Override public CompletableFuture<Void> dropTableAsync(String name) {
+ Objects.requireNonNull(name);
+
return ch.requestAsync(ClientOp.TABLE_DROP, w -> w.out().packString(name));
}
@@ -109,6 +121,8 @@ public class ClientTables implements IgniteTables {
/** {@inheritDoc} */
@Override public CompletableFuture<Table> tableAsync(String name) {
+ Objects.requireNonNull(name);
+
return ch.serviceAsync(ClientOp.TABLE_GET, w -> w.out().packString(name),
r -> new ClientTable(ch, r.in().unpackUuid(), name));
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleBuilder.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleBuilder.java
index 4079248..62a49d6 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleBuilder.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleBuilder.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.client.table;
import java.util.BitSet;
import java.util.Iterator;
import java.util.UUID;
-
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.TupleBuilder;
@@ -54,7 +53,7 @@ public final class ClientTupleBuilder implements TupleBuilder, Tuple {
/** {@inheritDoc} */
@Override public TupleBuilder set(String columnName, Object value) {
- // TODO: Live schema support IGNITE-15194
+ // TODO: Live schema and schema evolution support IGNITE-15194
var col = schema.column(columnName);
vals[col.schemaIndex()] = value == null ? NULL_OBJ : value;
@@ -241,6 +240,15 @@ public final class ClientTupleBuilder implements TupleBuilder, Tuple {
vals[columnIndex] = value;
}
+ /**
+ * Gets the schema.
+ *
+ * @return Schema.
+ */
+ public ClientSchema schema() {
+ return schema;
+ }
+
private void validateColumnIndex(int columnIndex) {
if (columnIndex < 0)
throw new IllegalArgumentException("Column index can't be negative");
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index b6625ed..416101c 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -26,7 +26,6 @@ import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.handler.ClientHandlerModule;
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
-import org.apache.ignite.internal.client.table.ClientTupleBuilder;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import org.apache.ignite.table.Tuple;
@@ -107,20 +106,22 @@ public abstract class AbstractClientTest {
}
public static void assertTupleEquals(Tuple x, Tuple y) {
- if (x == null)
+ if (x == null) {
assertNull(y);
+ return;
+ }
- if (y == null)
+ if (y == null) {
+ //noinspection ConstantConditions
assertNull(x);
+ return;
+ }
- var a = (ClientTupleBuilder) x;
- var b = (ClientTupleBuilder) y;
-
- assertEquals(a.columnCount(), b.columnCount());
+ assertEquals(x.columnCount(), y.columnCount());
- for (var i = 0; i < a.columnCount(); i++) {
- assertEquals(a.columnName(i), b.columnName(i));
- assertEquals((Object)a.value(i), b.value(i));
+ for (var i = 0; i < x.columnCount(); i++) {
+ assertEquals(x.columnName(i), y.columnName(i));
+ assertEquals((Object) x.value(i), y.value(i));
}
}
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientTableTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientTableTest.java
index 11e184a..a82ef5b 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientTableTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientTableTest.java
@@ -17,14 +17,19 @@
package org.apache.ignite.client;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.CompletionException;
-
+import org.apache.ignite.client.fakes.FakeSchemaRegistry;
+import org.apache.ignite.internal.client.table.ClientTupleBuilder;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -33,9 +38,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* Table tests.
*/
public class ClientTableTest extends AbstractClientTest {
+ private static final String DEFAULT_NAME = "John";
+
+ private static final Long DEFAULT_ID = 123L;
+
@Test
public void testGetWithNullInNotNullableKeyColumnThrowsException() {
- Table table = getDefaultTable();
+ var table = defaultTable();
var key = table.tupleBuilder().set("name", "123").build();
@@ -47,24 +56,20 @@ public class ClientTableTest extends AbstractClientTest {
@Test
public void testUpsertGet() {
- Table table = getDefaultTable();
-
- var tuple = table.tupleBuilder()
- .set("id", 123L)
- .set("name", "John")
- .build();
+ var table = defaultTable();
+ var tuple = tuple(table);
table.upsert(tuple);
- Tuple key = table.tupleBuilder().set("id", 123).build();
+ Tuple key = tuple(123L);
var resTuple = table.get(key);
- assertEquals("John", resTuple.stringValue("name"));
- assertEquals(123L, resTuple.longValue("id"));
+ assertEquals(DEFAULT_NAME, resTuple.stringValue("name"));
+ assertEquals(DEFAULT_ID, resTuple.longValue("id"));
assertEquals("foo", resTuple.valueOrDefault("bar", "foo"));
- assertEquals("John", resTuple.value(1));
- assertEquals(123L, (Long) resTuple.value(0));
+ assertEquals(DEFAULT_NAME, resTuple.value(1));
+ assertEquals(DEFAULT_ID, resTuple.value(0));
assertEquals(2, resTuple.columnCount());
assertEquals("id", resTuple.columnName(0));
@@ -73,10 +78,10 @@ public class ClientTableTest extends AbstractClientTest {
var iter = tuple.iterator();
assertTrue(iter.hasNext());
- assertEquals(123L, iter.next());
+ assertEquals(DEFAULT_ID, iter.next());
assertTrue(iter.hasNext());
- assertEquals("John", iter.next());
+ assertEquals(DEFAULT_NAME, iter.next());
assertFalse(iter.hasNext());
assertNull(iter.next());
@@ -86,14 +91,10 @@ public class ClientTableTest extends AbstractClientTest {
@Test
public void testUpsertGetAsync() {
- Table table = getDefaultTable();
-
- var tuple = table.tupleBuilder()
- .set("id", 42L)
- .set("name", "Jack")
- .build();
+ var table = defaultTable();
- Tuple key = table.tupleBuilder().set("id", 42).build();
+ var tuple = tuple(42L, "Jack");
+ var key = table.tupleBuilder().set("id", 42).build();
var resTuple = table.upsertAsync(tuple).thenCompose(t -> table.getAsync(key)).join();
@@ -102,7 +103,279 @@ public class ClientTableTest extends AbstractClientTest {
assertTupleEquals(tuple, resTuple);
}
- private Table getDefaultTable() {
+ @Test
+ public void testGetReturningTupleWithUnknownSchemaRequestsNewSchema() throws Exception {
+ FakeSchemaRegistry.setLastVer(2);
+
+ var table = defaultTable();
+ Tuple tuple = tuple(table);
+ table.upsert(tuple);
+
+ assertEquals(2, ((ClientTupleBuilder)tuple).schema().version());
+
+ FakeSchemaRegistry.setLastVer(1);
+
+ try (var client2 = startClient()) {
+ Table table2 = client2.tables().table(table.tableName());
+ var tuple2 = tuple(table2);
+ var resTuple = table2.get(tuple2);
+
+ assertEquals(1, ((ClientTupleBuilder)tuple2).schema().version());
+ assertEquals(2, ((ClientTupleBuilder)resTuple).schema().version());
+
+ assertEquals(DEFAULT_NAME, resTuple.stringValue("name"));
+ assertEquals(DEFAULT_ID, resTuple.longValue("id"));
+ }
+ }
+
+ @Test
+ public void testInsert() {
+ var table = defaultTable();
+
+ var tuple = tuple();
+ var tuple2 = tuple(DEFAULT_ID, "abc");
+
+ assertTrue(table.insert(tuple));
+ assertFalse(table.insert(tuple));
+ assertFalse(table.insert(tuple2));
+
+ var resTuple = table.get(defaultTupleKey(table));
+ assertTupleEquals(tuple, resTuple);
+ }
+
+ @Test
+ public void testInsertCustomTuple() {
+ var table = defaultTable();
+ var tuple = new CustomTuple(25L, "Foo");
+
+ assertTrue(table.insert(tuple));
+ assertFalse(table.insert(tuple));
+
+ var resTuple = table.get(new CustomTuple(25L));
+
+ assertTupleEquals(tuple, resTuple);
+ }
+
+ @Test
+ public void testGetAll() {
+ var table = defaultTable();
+ table.insert(tuple(1L, "1"));
+ table.insert(tuple(2L, "2"));
+ table.insert(tuple(3L, "3"));
+
+ List<Tuple> keys = Arrays.asList(tuple(1L), tuple(3L));
+ Tuple[] res = sortedTuples(table.getAll(keys));
+
+ assertEquals(2, res.length);
+
+ assertEquals(1L, res[0].longValue("id"));
+ assertEquals("1", res[0].stringValue("name"));
+
+ assertEquals(3L, res[1].longValue("id"));
+ assertEquals("3", res[1].stringValue("name"));
+ }
+
+ @Test
+ public void testUpsertAll() {
+ var table = defaultTable();
+
+ List<Tuple> data = Arrays.asList(tuple(1L, "1"), tuple(2L, "2"));
+ table.upsertAll(data);
+
+ assertEquals("1", table.get(tuple(1L)).stringValue("name"));
+ assertEquals("2", table.get(tuple(2L)).stringValue("name"));
+
+ List<Tuple> data2 = Arrays.asList(tuple(1L, "10"), tuple(3L, "30"));
+ table.upsertAll(data2);
+
+ assertEquals("10", table.get(tuple(1L)).stringValue("name"));
+ assertEquals("2", table.get(tuple(2L)).stringValue("name"));
+ assertEquals("30", table.get(tuple(3L)).stringValue("name"));
+ }
+
+ @Test
+ public void testInsertAll() {
+ var table = defaultTable();
+
+ List<Tuple> data = Arrays.asList(tuple(1L, "1"), tuple(2L, "2"));
+ var skippedTuples = table.insertAll(data);
+
+ assertEquals(0, skippedTuples.size());
+ assertEquals("1", table.get(tuple(1L)).stringValue("name"));
+ assertEquals("2", table.get(tuple(2L)).stringValue("name"));
+
+ List<Tuple> data2 = Arrays.asList(tuple(1L, "10"), tuple(3L, "30"));
+ var skippedTuples2 = table.insertAll(data2).toArray(new Tuple[0]);
+
+ assertEquals(1, skippedTuples2.length);
+ assertEquals(1L, skippedTuples2[0].longValue("id"));
+ assertEquals("1", table.get(tuple(1L)).stringValue("name"));
+ assertEquals("2", table.get(tuple(2L)).stringValue("name"));
+ assertEquals("30", table.get(tuple(3L)).stringValue("name"));
+ }
+
+ @Test
+ public void testReplace() {
+ var table = defaultTable();
+ table.insert(tuple(1L, "1"));
+
+ assertFalse(table.replace(tuple(3L, "3")));
+ assertNull(table.get(tuple(3L)));
+
+ assertTrue(table.replace(tuple(1L, "2")));
+ assertEquals("2", table.get(tuple(1L)).value("name"));
+ }
+
+ @Test
+ public void testReplaceExact() {
+ var table = defaultTable();
+ table.insert(tuple(1L, "1"));
+
+ assertFalse(table.replace(tuple(3L, "3"), tuple(3L, "4")));
+ assertNull(table.get(tuple(3L)));
+
+ assertFalse(table.replace(tuple(1L, "2"), tuple(1L, "3")));
+ assertTrue(table.replace(tuple(1L, "1"), tuple(1L, "3")));
+ assertEquals("3", table.get(tuple(1L)).value("name"));
+ }
+
+ @Test
+ public void testGetAndReplace() {
+ var table = defaultTable();
+ var tuple = tuple(1L, "1");
+ table.insert(tuple);
+
+ assertNull(table.getAndReplace(tuple(3L, "3")));
+ assertNull(table.get(tuple(3L)));
+
+ var replaceRes = table.getAndReplace(tuple(1L, "2"));
+ assertTupleEquals(tuple, replaceRes);
+ assertEquals("2", table.get(tuple(1L)).value("name"));
+ }
+
+ @Test
+ public void testDelete() {
+ var table = defaultTable();
+ table.insert(tuple(1L, "1"));
+
+ assertFalse(table.delete(tuple(2L)));
+ assertTrue(table.delete(tuple(1L)));
+ assertNull(table.get(tuple(1L)));
+ }
+
+ @Test
+ public void testDeleteExact() {
+ var table = defaultTable();
+ table.insert(tuple(1L, "1"));
+ table.insert(tuple(2L, "2"));
+
+ assertFalse(table.deleteExact(tuple(1L)));
+ assertFalse(table.deleteExact(tuple(1L, "x")));
+ assertTrue(table.deleteExact(tuple(1L, "1")));
+ assertFalse(table.deleteExact(tuple(2L)));
+ assertFalse(table.deleteExact(tuple(3L)));
+
+ assertNull(table.get(tuple(1L)));
+ assertNotNull(table.get(tuple(2L)));
+ }
+
+ @Test
+ public void testGetAndDelete() {
+ var table = defaultTable();
+ var tuple = tuple(1L, "1");
+ table.insert(tuple);
+
+ var deleted = table.getAndDelete(tuple(1L));
+
+ assertNull(table.getAndDelete(tuple(1L)));
+ assertNull(table.getAndDelete(tuple(2L)));
+ assertTupleEquals(tuple, deleted);
+ }
+
+ @Test
+ public void testDeleteAll() {
+ var table = defaultTable();
+
+ List<Tuple> data = Arrays.asList(tuple(1L, "1"), tuple(2L, "2"));
+ table.insertAll(data);
+
+ List<Tuple> toDelete = Arrays.asList(tuple(1L, "x"), tuple(3L, "y"), tuple(4L, "z"));
+ var skippedTuples = sortedTuples(table.deleteAll(toDelete));
+
+ assertEquals(2, skippedTuples.length);
+ assertNull(table.get(tuple(1L)));
+ assertNotNull(table.get(tuple(2L)));
+
+ assertEquals(3L, skippedTuples[0].longValue("id"));
+ assertNull(skippedTuples[0].stringValue("name"));
+
+ assertEquals(4L, skippedTuples[1].longValue("id"));
+ assertNull(skippedTuples[1].stringValue("name"));
+ }
+
+ @Test
+ public void testDeleteAllExact() {
+ var table = defaultTable();
+
+ List<Tuple> data = Arrays.asList(tuple(1L, "1"), tuple(2L, "2"));
+ table.insertAll(data);
+
+ List<Tuple> toDelete = Arrays.asList(tuple(1L, "1"), tuple(2L, "y"), tuple(3L, "z"));
+ var skippedTuples = sortedTuples(table.deleteAllExact(toDelete));
+
+ assertEquals(2, skippedTuples.length);
+ assertNull(table.get(tuple(1L)));
+ assertNotNull(table.get(tuple(2L)));
+
+ assertEquals(2L, skippedTuples[0].longValue("id"));
+ assertEquals("y", skippedTuples[0].stringValue("name"));
+
+ assertEquals(3L, skippedTuples[1].longValue("id"));
+ assertEquals("z", skippedTuples[1].stringValue("name"));
+ }
+
+ private static Tuple[] sortedTuples(Collection<Tuple> tuples) {
+ Tuple[] res = tuples.toArray(new Tuple[0]);
+
+ Arrays.sort(res, (x, y) -> (int) (x.longValue(0) - y.longValue(0)));
+
+ return res;
+ }
+
+ private Tuple tuple() {
+ return defaultTable().tupleBuilder()
+ .set("id", DEFAULT_ID)
+ .set("name", DEFAULT_NAME)
+ .build();
+ }
+
+ private Tuple tuple(Table table) {
+ return table.tupleBuilder()
+ .set("id", DEFAULT_ID)
+ .set("name", DEFAULT_NAME)
+ .build();
+ }
+
+ private Tuple tuple(Long id) {
+ return defaultTable().tupleBuilder()
+ .set("id", id)
+ .build();
+ }
+
+ private Tuple tuple(Long id, String name) {
+ return defaultTable().tupleBuilder()
+ .set("id", id)
+ .set("name", name)
+ .build();
+ }
+
+ private Tuple defaultTupleKey(Table table) {
+ return table.tupleBuilder()
+ .set("id", DEFAULT_ID)
+ .build();
+ }
+
+ private Table defaultTable() {
server.tables().getOrCreateTable(DEFAULT_TABLE, tbl -> tbl.changeReplicas(1));
return client.tables().table(DEFAULT_TABLE);
diff --git a/modules/client/src/test/java/org/apache/ignite/client/CustomTuple.java b/modules/client/src/test/java/org/apache/ignite/client/CustomTuple.java
new file mode 100644
index 0000000..b48a1da
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/CustomTuple.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.UUID;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * User-defined test {@link Tuple} implementation.
+ */
+public class CustomTuple implements Tuple {
+ /** */
+ private final Long id;
+
+ /** */
+ private final String name;
+
+ /** */
+ public CustomTuple(Long id) {
+ this(id, null);
+ }
+
+ /** */
+ public CustomTuple(Long id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ @Override public int columnCount() {
+ return 2;
+ }
+
+ @Override public String columnName(int columnIndex) {
+ switch (columnIndex) {
+ case 0: return "id";
+ case 1: return "name";
+ }
+
+ return null;
+ }
+
+ @Override public Integer columnIndex(String columnName) {
+ switch (columnName) {
+ case "id": return 0;
+ case "name": return 1;
+ }
+
+ return null;
+ }
+
+ @Override public <T> T valueOrDefault(String columnName, T def) {
+ switch (columnName) {
+ case "id": return (T) id;
+ case "name": return (T) name;
+ }
+
+ return def;
+ }
+
+ @Override public <T> T value(String columnName) {
+ return valueOrDefault(columnName, null);
+ }
+
+ @Override public <T> T value(int columnIndex) {
+ switch (columnIndex) {
+ case 0: return (T) id;
+ case 1: return (T) name;
+ }
+
+ return null;
+ }
+
+ @Override public BinaryObject binaryObjectValue(String columnName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public BinaryObject binaryObjectValue(int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public byte byteValue(String columnName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public byte byteValue(int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public short shortValue(String columnName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public short shortValue(int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public int intValue(String columnName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public int intValue(int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public long longValue(String columnName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public long longValue(int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public float floatValue(String columnName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public float floatValue(int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public double doubleValue(String columnName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public double doubleValue(int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public String stringValue(String columnName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public String stringValue(int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public UUID uuidValue(String columnName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public UUID uuidValue(int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public BitSet bitmaskValue(String columnName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public BitSet bitmaskValue(int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @NotNull @Override public Iterator<Object> iterator() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index 68ce1cb..4b19263 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -27,7 +27,7 @@ import org.apache.ignite.configuration.schemas.table.TableChange;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.lang.IgniteException;
@@ -135,8 +135,8 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
return new TableImpl(new FakeInternalTable(name, tableId), getSchemaReg(tableId), null, null);
}
- @NotNull private SchemaRegistryImpl getSchemaReg(UUID tableId) {
- return new SchemaRegistryImpl(1, v -> getSchema(v, tableId));
+ @NotNull private SchemaRegistry getSchemaReg(UUID tableId) {
+ return new FakeSchemaRegistry(v -> getSchema(v, tableId));
}
/**
@@ -146,14 +146,26 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
* @return Schema descriptor.
*/
private SchemaDescriptor getSchema(Integer v, UUID tableId) {
- if (v != 1)
- return null;
-
- return new SchemaDescriptor(
- tableId,
- 1,
- new Column[]{new Column("id", NativeTypes.INT64, false)},
- new Column[]{new Column("name", NativeTypes.STRING, true)});
+ switch (v) {
+ case 1:
+ return new SchemaDescriptor(
+ tableId,
+ 1,
+ new Column[]{new Column("id", NativeTypes.INT64, false)},
+ new Column[]{new Column("name", NativeTypes.STRING, true)});
+
+ case 2:
+ return new SchemaDescriptor(
+ tableId,
+ 2,
+ new Column[]{new Column("id", NativeTypes.INT64, false)},
+ new Column[]{
+ new Column("name", NativeTypes.STRING, true),
+ new Column("xyz", NativeTypes.STRING, true)
+ });
+ }
+
+ return null;
}
/** {@inheritDoc} */
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index a32eea9..d92f289 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -25,6 +25,7 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -34,12 +35,21 @@ import java.util.concurrent.ConcurrentHashMap;
* Fake internal table.
*/
public class FakeInternalTable implements InternalTable {
+ /** Table name. */
private final String tableName;
+ /** Table ID. */
private final UUID tableId;
+ /** Table data. */
private final ConcurrentHashMap<ByteBuffer, BinaryRow> data = new ConcurrentHashMap<>();
+ /**
+ * Constructor.
+ *
+ * @param tableName Name.
+ * @param tableId Id.
+ */
public FakeInternalTable(String tableName, UUID tableId) {
this.tableName = tableName;
this.tableId = tableId;
@@ -72,7 +82,16 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override public CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows, @Nullable Transaction tx) {
- return null;
+ var res = new ArrayList<BinaryRow>();
+
+ for (var key : keyRows) {
+ var val = get(key, null);
+
+ if (val != null)
+ res.add(val.getNow(null));
+ }
+
+ return CompletableFuture.completedFuture(res);
}
/** {@inheritDoc} */
@@ -84,61 +103,126 @@ public class FakeInternalTable implements InternalTable {
/** {@inheritDoc} */
@Override public CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows, @Nullable Transaction tx) {
- return null;
+ for (var row : rows)
+ upsert(row, tx);
+
+ return CompletableFuture.completedFuture(null);
}
/** {@inheritDoc} */
@Override public CompletableFuture<BinaryRow> getAndUpsert(BinaryRow row, @Nullable Transaction tx) {
- return null;
+ var res = get(row, tx);
+
+ upsert(row, tx);
+
+ return CompletableFuture.completedFuture(res.getNow(null));
}
/** {@inheritDoc} */
@Override public CompletableFuture<Boolean> insert(BinaryRow row, @Nullable Transaction tx) {
- return null;
+ var old = get(row, tx).getNow(null);
+
+ if (old == null) {
+ upsert(row, tx);
+
+ return CompletableFuture.completedFuture(true);
+ }
+
+ return CompletableFuture.completedFuture(false);
}
/** {@inheritDoc} */
@Override public CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRow> rows, @Nullable Transaction tx) {
- return null;
+ var skipped = new ArrayList<BinaryRow>();
+
+ for (var row : rows) {
+ if (!insert(row, tx).getNow(null))
+ skipped.add(row);
+ }
+
+ return CompletableFuture.completedFuture(skipped);
}
/** {@inheritDoc} */
@Override public CompletableFuture<Boolean> replace(BinaryRow row, @Nullable Transaction tx) {
- return null;
+ var old = get(row, tx).getNow(null);
+
+ if (old == null)
+ return CompletableFuture.completedFuture(false);
+
+ return upsert(row, tx).thenApply(f -> true);
}
/** {@inheritDoc} */
@Override public CompletableFuture<Boolean> replace(BinaryRow oldRow, BinaryRow newRow, @Nullable Transaction tx) {
- return null;
+ var old = get(oldRow, tx).getNow(null);
+
+ if (old == null || !old.valueSlice().equals(oldRow.valueSlice()))
+ return CompletableFuture.completedFuture(false);
+
+ return upsert(newRow, tx).thenApply(f -> true);
}
/** {@inheritDoc} */
@Override public CompletableFuture<BinaryRow> getAndReplace(BinaryRow row, @Nullable Transaction tx) {
- return null;
+ var old = get(row, tx);
+
+ return replace(row, tx).thenCompose(f -> old);
}
/** {@inheritDoc} */
@Override public CompletableFuture<Boolean> delete(BinaryRow keyRow, @Nullable Transaction tx) {
- return null;
+ var old = get(keyRow, tx).getNow(null);
+
+ if (old != null)
+ data.remove(keyRow.keySlice());
+
+ return CompletableFuture.completedFuture(old != null);
}
/** {@inheritDoc} */
@Override public CompletableFuture<Boolean> deleteExact(BinaryRow oldRow, @Nullable Transaction tx) {
- return null;
+ var old = get(oldRow, tx).getNow(null);
+
+ if (old != null && old.valueSlice().equals(oldRow.valueSlice())) {
+ data.remove(oldRow.keySlice());
+ return CompletableFuture.completedFuture(true);
+ }
+
+ return CompletableFuture.completedFuture(false);
}
/** {@inheritDoc} */
@Override public CompletableFuture<BinaryRow> getAndDelete(BinaryRow row, @Nullable Transaction tx) {
- return null;
+ var old = get(row, tx).getNow(null);
+
+ if (old != null)
+ data.remove(row.keySlice());
+
+ return CompletableFuture.completedFuture(old);
}
/** {@inheritDoc} */
@Override public CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRow> rows, @Nullable Transaction tx) {
- return null;
+ var skipped = new ArrayList<BinaryRow>();
+
+ for (var row : rows) {
+ if (!delete(row, tx).getNow(false))
+ skipped.add(row);
+ }
+
+ return CompletableFuture.completedFuture(skipped);
}
/** {@inheritDoc} */
@Override public CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRow> rows, @Nullable Transaction tx) {
- return null;
+ var skipped = new ArrayList<BinaryRow>();
+
+ for (var row : rows) {
+ if (!deleteExact(row, tx).getNow(false))
+ skipped.add(row);
+ }
+
+ return CompletableFuture.completedFuture(skipped);
}
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
new file mode 100644
index 0000000..5b7c5be
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSchemaRegistry.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.fakes;
+
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.row.Row;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Fake schema registry for tests.
+ */
+public class FakeSchemaRegistry implements SchemaRegistry {
+ /** Last registered version. */
+ private static volatile int lastVer = 1;
+
+ /** Cached schemas. */
+ private final ConcurrentNavigableMap<Integer, SchemaDescriptor> schemaCache = new ConcurrentSkipListMap<>();
+
+ /** Schema store. */
+ private final Function<Integer, SchemaDescriptor> history;
+
+ /**
+ * Constructor.
+ *
+ * @param history Schema history.
+ */
+ public FakeSchemaRegistry(Function<Integer, SchemaDescriptor> history) {
+ this.history = history;
+ }
+
+ /**
+ * Sets the last schema version
+ *
+ * @param lastVer Last schema version.
+ */
+ public static void setLastVer(int lastVer) {
+ FakeSchemaRegistry.lastVer = lastVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override @NotNull public SchemaDescriptor schema(int ver) {
+ SchemaDescriptor desc = schemaCache.get(ver);
+
+ if (desc != null)
+ return desc;
+
+ desc = history.apply(ver);
+
+ if (desc != null) {
+ schemaCache.putIfAbsent(ver, desc);
+
+ return desc;
+ }
+
+ if (lastVer < ver || ver <= 0)
+ throw new SchemaRegistryException("Incorrect schema version requested: ver=" + ver);
+ else
+ throw new SchemaRegistryException("Failed to find schema: ver=" + ver);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable SchemaDescriptor schema() {
+ return schema(lastVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lastSchemaVersion() {
+ return lastVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row resolve(BinaryRow row) {
+ return new Row(schema(row.schemaVersion()), row);
+ }
+}