You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/11/10 12:25:38 UTC

[GitHub] [ignite-3] isapego commented on a change in pull request #434: IGNITE-15234 Implement ByteBuf-based message packer and unpacker

isapego commented on a change in pull request #434:
URL: https://github.com/apache/ignite-3/pull/434#discussion_r746533918



##########
File path: modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
##########
@@ -167,7 +167,7 @@ void testHandshakeInvalidVersionReturnsError() throws Exception {
             final var err = unpacker.unpackString();
             
             assertArrayEquals(MAGIC, magic);
-            assertEquals(31, len);
+            assertEquals(32, len);

Review comment:
       Why this change occurred?

##########
File path: modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
##########
@@ -36,637 +34,748 @@
 import java.util.BitSet;
 import java.util.UUID;
 import org.apache.ignite.lang.IgniteUuid;
-import org.msgpack.core.MessagePack;
-import org.msgpack.core.MessagePacker;
-import org.msgpack.core.buffer.OutputStreamBufferOutput;
-import org.msgpack.value.Value;
 
 /**
- * Ignite-specific MsgPack extension based on Netty ByteBuf.
+ * ByteBuf-based MsgPack implementation. Replaces {@link org.msgpack.core.MessagePacker} to avoid
+ * extra buffers and indirection.
  *
  * <p>Releases wrapped buffer on {@link #close()} .
  */
-public class ClientMessagePacker extends MessagePacker {
-    /** Underlying buffer. */
+public class ClientMessagePacker implements AutoCloseable {
+    /**
+     * Underlying buffer.
+     */
     private final ByteBuf buf;
-
-    /** Closed flag. */
-    private boolean closed = false;
-
+    
+    /**
+     * Closed flag.
+     */
+    private boolean closed;
+    
     /**
      * Constructor.
      *
      * @param buf Buffer.
      */
     public ClientMessagePacker(ByteBuf buf) {
-        // TODO: Remove intermediate classes and buffers IGNITE-15234.
-        // Reserve 4 bytes for the message length.
-        super(new OutputStreamBufferOutput(new ByteBufOutputStream(buf.writerIndex(HEADER_SIZE))),
-                MessagePack.DEFAULT_PACKER_CONFIG);
-
-        this.buf = buf;
+        this.buf = buf.writerIndex(HEADER_SIZE);
     }
-
+    
     /**
      * Gets the underlying buffer.
      *
      * @return Underlying buffer.
-     * @throws UncheckedIOException When flush fails.
      */
     public ByteBuf getBuffer() {
-        try {
-            flush();
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-
         buf.setInt(0, buf.writerIndex() - HEADER_SIZE);
-
+        
         return buf;
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packNil() {
-        assert !closed : "Packer is closed";
-
-        try {
-            return super.packNil();
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packBoolean(boolean b) {
-        assert !closed : "Packer is closed";
-
-        try {
-            return super.packBoolean(b);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packByte(byte b) {
-        assert !closed : "Packer is closed";
-
-        try {
-            return super.packByte(b);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packShort(short v) {
+    
+    /**
+     * Writes a Nil value.
+     */
+    public void packNil() {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packShort(v);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
+        
+        buf.writeByte(Code.NIL);
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packInt(int r) {
+    
+    /**
+     * Writes a boolean value.
+     *
+     * @param b the value to be written.
+     */
+    public void packBoolean(boolean b) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packInt(r);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
+        
+        buf.writeByte(b ? Code.TRUE : Code.FALSE);
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packLong(long v) {
+    
+    /**
+     * Writes an Integer value.
+     *
+     * @param b the value to be written.
+     */
+    public void packByte(byte b) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packLong(v);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
+        
+        if (b < -(1 << 5)) {
+            buf.writeByte(Code.INT8);
         }
+        
+        buf.writeByte(b);
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packBigInteger(BigInteger bi) {
+    
+    /**
+     * Writes a short value.
+     *
+     * @param v the value to be written.
+     */
+    public void packShort(short v) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packBigInteger(bi);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
+        
+        if (v < -(1 << 5)) {
+            if (v < -(1 << 7)) {
+                buf.writeByte(Code.INT16);
+                buf.writeShort(v);
+            } else {
+                buf.writeByte(Code.INT8);
+                buf.writeByte(v);
+            }
+        } else if (v < (1 << 7)) {
+            buf.writeByte(v);
+        } else {
+            if (v < (1 << 8)) {
+                buf.writeByte(Code.UINT8);
+                buf.writeByte(v);
+            } else {
+                buf.writeByte(Code.UINT16);
+                buf.writeShort(v);
+            }
         }
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packFloat(float v) {
+    
+    /**
+     * Writes an int value.
+     *
+     * @param i the value to be written.
+     */
+    public void packInt(int i) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packFloat(v);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
+        
+        if (i < -(1 << 5)) {
+            if (i < -(1 << 15)) {
+                buf.writeByte(Code.INT32);
+                buf.writeInt(i);
+            } else if (i < -(1 << 7)) {
+                buf.writeByte(Code.INT16);
+                buf.writeShort(i);
+            } else {
+                buf.writeByte(Code.INT8);
+                buf.writeByte(i);
+            }
+        } else if (i < (1 << 7)) {
+            buf.writeByte(i);
+        } else {
+            if (i < (1 << 8)) {
+                buf.writeByte(Code.UINT8);
+                buf.writeByte(i);
+            } else if (i < (1 << 16)) {
+                buf.writeByte(Code.UINT16);
+                buf.writeShort(i);
+            } else {
+                buf.writeByte(Code.UINT32);
+                buf.writeInt(i);
+            }
         }
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packDouble(double v) {
+    
+    /**
+     * Writes a long value.
+     *
+     * @param v the value to be written.
+     */
+    public void packLong(long v) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packDouble(v);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
+        
+        if (v < -(1L << 5)) {
+            if (v < -(1L << 15)) {
+                if (v < -(1L << 31)) {
+                    buf.writeByte(Code.INT64);
+                    buf.writeLong(v);
+                } else {
+                    buf.writeByte(Code.INT32);
+                    buf.writeInt((int) v);
+                }
+            } else {
+                if (v < -(1 << 7)) {
+                    buf.writeByte(Code.INT16);
+                    buf.writeShort((short) v);
+                } else {
+                    buf.writeByte(Code.INT8);
+                    buf.writeByte((byte) v);
+                }
+            }
+        } else if (v < (1 << 7)) {
+            // fixnum
+            buf.writeByte((byte) v);
+        } else {
+            if (v < (1L << 16)) {
+                if (v < (1 << 8)) {
+                    buf.writeByte(Code.UINT8);
+                    buf.writeByte((byte) v);
+                } else {
+                    buf.writeByte(Code.UINT16);
+                    buf.writeShort((short) v);
+                }
+            } else {
+                if (v < (1L << 32)) {
+                    buf.writeByte(Code.UINT32);
+                    buf.writeInt((int) v);
+                } else {
+                    buf.writeByte(Code.UINT64);
+                    buf.writeLong(v);
+                }
+            }
         }
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packString(String s) {
+    
+    /**
+     * Writes a big integer value.
+     *
+     * @param bi the value to be written.
+     */
+    public void packBigInteger(BigInteger bi) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packString(s);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packArrayHeader(int arraySize) {
+        
+        if (bi.bitLength() <= 63) {
+            packLong(bi.longValue());
+        } else if (bi.bitLength() == 64 && bi.signum() == 1) {
+            buf.writeByte(Code.UINT64);
+            buf.writeLong(bi.longValue());
+        } else {
+            throw new IllegalArgumentException(
+                    "MessagePack cannot serialize BigInteger larger than 2^64-1");
+        }
+    }
+    
+    /**
+     * Writes a float value.
+     *
+     * @param v the value to be written.
+     */
+    public void packFloat(float v) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packArrayHeader(arraySize);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
+        
+        buf.writeByte(Code.FLOAT32);
+        buf.writeFloat(v);
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packMapHeader(int mapSize) {
+    
+    /**
+     * Writes a double value.
+     *
+     * @param v the value to be written.
+     */
+    public void packDouble(double v) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packMapHeader(mapSize);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
+        
+        buf.writeByte(Code.FLOAT64);
+        buf.writeDouble(v);
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packValue(Value v) {
+    
+    /**
+     * Writes a string value.
+     *
+     * @param s the value to be written.
+     */
+    public void packString(String s) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packValue(v);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packExtensionTypeHeader(byte extType, int payloadLen) {
+        
+        // Header is a varint.
+        // Use pessimistic utf8MaxBytes to reserve bytes for the header.
+        // This may cause an extra byte to be used for the header,
+        // but this is cheaper than calculating correct utf8 byte size, which involves full string scan.
+        int maxBytes = ByteBufUtil.utf8MaxBytes(s);
+        int headerSize = getStringHeaderSize(maxBytes);
+        int headerPos = buf.writerIndex();
+        
+        buf.writerIndex(headerPos + headerSize);
+        
+        int bytesWritten = ByteBufUtil.writeUtf8(buf, s);
+        int endPos = buf.writerIndex();
+        
+        buf.writerIndex(headerPos);
+    
+        if (headerSize == 1) {
+            buf.writeByte((byte) (Code.FIXSTR_PREFIX | bytesWritten));
+        } else if (headerSize == 2) {
+            buf.writeByte(Code.STR8);
+            buf.writeByte(bytesWritten);
+        } else if (headerSize == 3) {
+            buf.writeByte(Code.STR16);
+            buf.writeShort(bytesWritten);
+        } else {
+            assert headerSize == 5 : "headerSize == 5";
+            buf.writeByte(Code.STR32);
+            buf.writeInt(bytesWritten);
+        }
+        
+        buf.writerIndex(endPos);
+    }
+    
+    /**
+     * Writes an array header value.
+     *
+     * @param arraySize array size.
+     */
+    public void packArrayHeader(int arraySize) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packExtensionTypeHeader(extType, payloadLen);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packBinaryHeader(int len) {
+        
+        if (arraySize < 0) {
+            throw new IllegalArgumentException("array size must be >= 0");
+        }
+        
+        if (arraySize < (1 << 4)) {
+            buf.writeByte((byte) (Code.FIXARRAY_PREFIX | arraySize));
+        } else if (arraySize < (1 << 16)) {
+            buf.writeByte(Code.ARRAY16);
+            buf.writeShort(arraySize);
+        } else {
+            buf.writeByte(Code.ARRAY32);
+            buf.writeInt(arraySize);
+        }
+    }
+    
+    /**
+     * Writes a map header value.
+     *
+     * @param mapSize map size.
+     */
+    public void packMapHeader(int mapSize) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packBinaryHeader(len);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker packRawStringHeader(int len) {
+        
+        if (mapSize < 0) {
+            throw new IllegalArgumentException("map size must be >= 0");
+        }
+        
+        if (mapSize < (1 << 4)) {
+            buf.writeByte((byte) (Code.FIXMAP_PREFIX | mapSize));
+        } else if (mapSize < (1 << 16)) {
+            buf.writeByte(Code.MAP16);
+            buf.writeShort(mapSize);
+        } else {
+            buf.writeByte(Code.MAP32);
+            buf.writeInt(mapSize);
+        }
+    }
+    
+    /**
+     * Writes Extension value header.
+     *
+     * <p>Should be followed by {@link #writePayload(byte[])} method to write the extension body.
+     *
+     * @param extType    the extension type tag to be written.
+     * @param payloadLen number of bytes of a payload binary to be written.
+     */
+    public void packExtensionTypeHeader(byte extType, int payloadLen) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.packRawStringHeader(len);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
+        
+        if (payloadLen < (1 << 8)) {
+            if (payloadLen > 0
+                    && (payloadLen & (payloadLen - 1)) == 0) { // check whether dataLen == 2^x
+                if (payloadLen == 1) {
+                    buf.writeByte(Code.FIXEXT1);
+                    buf.writeByte(extType);
+                } else if (payloadLen == 2) {
+                    buf.writeByte(Code.FIXEXT2);
+                    buf.writeByte(extType);
+                } else if (payloadLen == 4) {
+                    buf.writeByte(Code.FIXEXT4);
+                    buf.writeByte(extType);
+                } else if (payloadLen == 8) {
+                    buf.writeByte(Code.FIXEXT8);
+                    buf.writeByte(extType);
+                } else if (payloadLen == 16) {
+                    buf.writeByte(Code.FIXEXT16);
+                    buf.writeByte(extType);
+                } else {
+                    buf.writeByte(Code.EXT8);
+                    buf.writeByte(payloadLen);
+                    buf.writeByte(extType);
+                }
+            } else {
+                buf.writeByte(Code.EXT8);
+                buf.writeByte(payloadLen);
+                buf.writeByte(extType);
+            }
+        } else if (payloadLen < (1 << 16)) {
+            buf.writeByte(Code.EXT16);
+            buf.writeShort(payloadLen);
+            buf.writeByte(extType);
+        } else {
+            buf.writeByte(Code.EXT32);
+            buf.writeInt(payloadLen);
+            buf.writeByte(extType);
         }
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker writePayload(byte[] src) {
+    
+    /**
+     * Writes a binary header value.
+     *
+     * @param len binary value size.
+     */
+    public void packBinaryHeader(int len) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.writePayload(src);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker writePayload(byte[] src, int off, int len) {
+        
+        if (len < (1 << 8)) {
+            buf.writeByte(Code.BIN8);
+            buf.writeByte(len);
+        } else if (len < (1 << 16)) {
+            buf.writeByte(Code.BIN16);
+            buf.writeShort(len);
+        } else {
+            buf.writeByte(Code.BIN32);
+            buf.writeInt(len);
+        }
+    }
+    
+    /**
+     * Writes a raw string header value.
+     *
+     * @param len string value size.
+     */
+    public void packRawStringHeader(int len) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.writePayload(src, off, len);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker addPayload(byte[] src) {
+        
+        if (len < (1 << 5)) {
+            buf.writeByte((byte) (Code.FIXSTR_PREFIX | len));
+        } else if (len < (1 << 8)) {
+            buf.writeByte(Code.STR8);
+            buf.writeByte(len);
+        } else if (len < (1 << 16)) {
+            buf.writeByte(Code.STR16);
+            buf.writeShort(len);
+        } else {
+            buf.writeByte(Code.STR32);
+            buf.writeInt(len);
+        }
+    }
+    
+    /**
+     * Writes a byte array to the output.
+     *
+     * <p>This method is used with {@link #packRawStringHeader(int)} or {@link #packBinaryHeader(int)}
+     * methods.
+     *
+     * @param src the data to add.
+     */
+    public void writePayload(byte[] src) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.addPayload(src);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
+        
+        buf.writeBytes(src);
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public MessagePacker addPayload(byte[] src, int off, int len) {
+    
+    /**
+     * Writes a byte array to the output.
+     *
+     * <p>This method is used with {@link #packRawStringHeader(int)} or {@link #packBinaryHeader(int)}
+     * methods.
+     *
+     * @param src the data to add.
+     * @param off the start offset in the data.
+     * @param len the number of bytes to add.
+     */
+    public void writePayload(byte[] src, int off, int len) {
         assert !closed : "Packer is closed";
-
-        try {
-            return super.addPayload(src, off, len);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
+        
+        buf.writeBytes(src, off, len);
     }
-
+    
     /**
-     * Writes an UUID.
+     * Writes a UUID.
      *
      * @param val UUID value.
-     * @return This instance.
      */
-    public ClientMessagePacker packUuid(UUID val) {
+    public void packUuid(UUID val) {
         assert !closed : "Packer is closed";
-
+        
         packExtensionTypeHeader(ClientMsgPackType.UUID, 16);
-
-        // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
-        var bytes = new byte[16];
-        ByteBuffer bb = ByteBuffer.wrap(bytes);
-
-        bb.putLong(val.getMostSignificantBits());
-        bb.putLong(val.getLeastSignificantBits());
-
-        addPayload(bytes);
-
-        return this;
+        
+        buf.writeLong(val.getMostSignificantBits());
+        buf.writeLong(val.getLeastSignificantBits());
     }
-
+    
     /**
      * Writes an {@link IgniteUuid}.
      *
      * @param val {@link IgniteUuid} value.
-     * @return This instance.
      */
-    public ClientMessagePacker packIgniteUuid(IgniteUuid val) {
+    public void packIgniteUuid(IgniteUuid val) {
         assert !closed : "Packer is closed";
-
+        
         packExtensionTypeHeader(ClientMsgPackType.IGNITE_UUID, 24);
-
-        // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
-        var bytes = new byte[24];
-        ByteBuffer bb = ByteBuffer.wrap(bytes);
-
+        
         UUID globalId = val.globalId();
-
-        bb.putLong(globalId.getMostSignificantBits());
-        bb.putLong(globalId.getLeastSignificantBits());
-
-        bb.putLong(val.localId());
-
-        writePayload(bytes);
-
-        return this;
+        
+        buf.writeLong(globalId.getMostSignificantBits());
+        buf.writeLong(globalId.getLeastSignificantBits());
+        buf.writeLong(val.localId());
     }
-
+    
     /**
      * Writes a decimal.
      *
      * @param val Decimal value.
-     * @return This instance.
      */
-    public ClientMessagePacker packDecimal(BigDecimal val) {
+    public void packDecimal(BigDecimal val) {
         assert !closed : "Packer is closed";
-
-        // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
+        
         byte[] unscaledValue = val.unscaledValue().toByteArray();
-
-        packExtensionTypeHeader(ClientMsgPackType.DECIMAL, 4 + unscaledValue.length); // Scale length + data length
-
-        addPayload(ByteBuffer.wrap(new byte[4]).putInt(val.scale()).array());
-        addPayload(unscaledValue);
-
-        return this;
-    }
-
+        
+        packExtensionTypeHeader(ClientMsgPackType.DECIMAL,
+                4 + unscaledValue.length); // Scale length + data length
+        
+        buf.writeInt(val.scale());
+        buf.writeBytes(unscaledValue);
+    }
+    
     /**
      * Writes a decimal.
      *
      * @param val Decimal value.
-     * @return This instance.
      */
-    public ClientMessagePacker packNumber(BigInteger val) {
+    public void packNumber(BigInteger val) {
         assert !closed : "Packer is closed";
-
+        
         byte[] data = val.toByteArray();
-
+        
         packExtensionTypeHeader(ClientMsgPackType.NUMBER, data.length);
-
-        addPayload(data);
-
-        return this;
+        
+        buf.writeBytes(data);
     }
-
+    
     /**
      * Writes a bit set.
      *
      * @param val Bit set value.
-     * @return This instance.
      */
-    public ClientMessagePacker packBitSet(BitSet val) {
+    public void packBitSet(BitSet val) {
         assert !closed : "Packer is closed";
-
-        // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
+        
         byte[] data = val.toByteArray();
-
+        
         packExtensionTypeHeader(ClientMsgPackType.BITMASK, data.length);
-
-        addPayload(data);
-
-        return this;
+        
+        buf.writeBytes(data);
     }
-
+    
     /**
      * Writes an integer array.
      *
      * @param arr Integer array value.
-     * @return This instance.
      */
-    public ClientMessagePacker packIntArray(int[] arr) {
+    public void packIntArray(int[] arr) {
         assert !closed : "Packer is closed";
-
+        
         if (arr == null) {
             packNil();
-
-            return this;
+            
+            return;
         }
-
+        
         packArrayHeader(arr.length);
-
+    
         for (int i : arr) {
             packInt(i);
         }
-
-        return this;
     }
-
+    
     /**
      * Writes a date.
      *
      * @param val Date value.
-     * @return This instance.
      */
-    public ClientMessagePacker packDate(LocalDate val) {
+    public void packDate(LocalDate val) {
         assert !closed : "Packer is closed";
-
-        byte[] data = new byte[6];
-
-        // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
-        ByteBuffer.wrap(data)
-                .putInt(val.getYear())
-                .put((byte) val.getMonthValue())
-                .put((byte) val.getDayOfMonth());
-
-        packExtensionTypeHeader(ClientMsgPackType.DATE, data.length);
-
-        addPayload(data);
-
-        return this;
-    }
-
+        
+        packExtensionTypeHeader(ClientMsgPackType.DATE, 6);
+        
+        buf.writeInt(val.getYear());
+        buf.writeByte(val.getMonthValue());
+        buf.writeByte(val.getDayOfMonth());
+    }
+    
     /**
      * Writes a time.
      *
      * @param val Time value.
-     * @return This instance.
      */
-    public ClientMessagePacker packTime(LocalTime val) {
+    public void packTime(LocalTime val) {
         assert !closed : "Packer is closed";
-
-        byte[] data = new byte[7];
-
-        // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
-        ByteBuffer.wrap(data)
-                .put((byte) val.getHour())
-                .put((byte) val.getMinute())
-                .put((byte) val.getSecond())
-                .putInt(val.getNano());
-
-        packExtensionTypeHeader(ClientMsgPackType.TIME, data.length);
-
-        addPayload(data);
-
-        return this;
-    }
-
+        
+        packExtensionTypeHeader(ClientMsgPackType.TIME, 7);
+        
+        buf.writeByte(val.getHour());
+        buf.writeByte(val.getMinute());
+        buf.writeByte(val.getSecond());
+        buf.writeInt(val.getNano());
+    }
+    
     /**
      * Writes a datetime.
      *
      * @param val Datetime value.
-     * @return This instance.
      */
-    public ClientMessagePacker packDateTime(LocalDateTime val) {
+    public void packDateTime(LocalDateTime val) {
         assert !closed : "Packer is closed";
-
-        byte[] data = new byte[13];
-
-        // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
-        ByteBuffer.wrap(data)
-                .putInt(val.getYear())
-                .put((byte) val.getMonthValue())
-                .put((byte) val.getDayOfMonth())
-                .put((byte) val.getHour())
-                .put((byte) val.getMinute())
-                .put((byte) val.getSecond())
-                .putInt(val.getNano());
-
-        packExtensionTypeHeader(ClientMsgPackType.DATETIME, data.length);
-
-        addPayload(data);
-
-        return this;
-    }
-
+        
+        packExtensionTypeHeader(ClientMsgPackType.DATETIME, 13);
+        
+        buf.writeInt(val.getYear());
+        buf.writeByte(val.getMonthValue());
+        buf.writeByte(val.getDayOfMonth());
+        buf.writeByte(val.getHour());
+        buf.writeByte(val.getMinute());
+        buf.writeByte(val.getSecond());
+        buf.writeInt(val.getNano());
+    }
+    
     /**
      * Writes a timestamp.
      *
      * @param val Timestamp value.
-     * @return This instance.
-     * @throws UnsupportedOperationException Not supported.
      */
-    public ClientMessagePacker packTimestamp(Instant val) {
+    public void packTimestamp(Instant val) {
         assert !closed : "Packer is closed";
-
-        byte[] data = new byte[12];
-
-        // TODO: Pack directly to ByteBuf without allocating IGNITE-15234.
-        ByteBuffer.wrap(data)
-                .putLong(val.getEpochSecond())
-                .putInt(val.getNano());
-
-        packExtensionTypeHeader(ClientMsgPackType.TIMESTAMP, data.length);
-
-        addPayload(data);
-
-        return this;
+        
+        packExtensionTypeHeader(ClientMsgPackType.TIMESTAMP, 12);
+        
+        buf.writeLong(val.getEpochSecond());
+        buf.writeInt(val.getNano());
     }
-
+    
     /**
      * Packs an object.
      *
      * @param val Object value.
-     * @return This instance.
      * @throws UnsupportedOperationException When type is not supported.
      */
-    public ClientMessagePacker packObject(Object val) {
+    public void packObject(Object val) {
         if (val == null) {
-            return (ClientMessagePacker) packNil();
+            packNil();
+            
+            return;
         }
-
+        
         if (val instanceof Byte) {
-            return (ClientMessagePacker) packByte((byte) val);
+            packByte((byte) val);
+            
+            return;
         }
-
+        
         if (val instanceof Short) {
-            return (ClientMessagePacker) packShort((short) val);
+            packShort((short) val);
+            
+            return;
         }
-
+        
         if (val instanceof Integer) {
-            return (ClientMessagePacker) packInt((int) val);
+            packInt((int) val);
+            
+            return;
         }
-
+        
         if (val instanceof Long) {
-            return (ClientMessagePacker) packLong((long) val);
+            packLong((long) val);
+            
+            return;
         }
-
+        
         if (val instanceof Float) {
-            return (ClientMessagePacker) packFloat((float) val);
+            packFloat((float) val);
+            
+            return;
         }
-
+        
         if (val instanceof Double) {
-            return (ClientMessagePacker) packDouble((double) val);
+            packDouble((double) val);
+            
+            return;
         }
-
+        
         if (val instanceof UUID) {
-            return packUuid((UUID) val);
+            packUuid((UUID) val);
+            
+            return;
         }
-
+        
         if (val instanceof String) {
-            return (ClientMessagePacker) packString((String) val);
+            packString((String) val);
+            
+            return;
         }
-
+        
         if (val instanceof byte[]) {
             byte[] bytes = (byte[]) val;
             packBinaryHeader(bytes.length);
             writePayload(bytes);
-
-            return this;
+            
+            return;
         }
-
+        
         if (val instanceof BigDecimal) {
-            return packDecimal((BigDecimal) val);
+            packDecimal((BigDecimal) val);
+            
+            return;
         }
-
+        
         if (val instanceof BigInteger) {
-            return packNumber((BigInteger) val);
+            packNumber((BigInteger) val);
+            
+            return;
         }
-
+        
         if (val instanceof BitSet) {
-            return packBitSet((BitSet) val);
+            packBitSet((BitSet) val);
+            
+            return;
         }
-
+        
         if (val instanceof LocalDate) {
-            return packDate((LocalDate) val);
+            packDate((LocalDate) val);
+            
+            return;
         }
-
+        
         if (val instanceof LocalTime) {
-            return packTime((LocalTime) val);
+            packTime((LocalTime) val);
+            
+            return;
         }
-
+        
         if (val instanceof LocalDateTime) {
-            return packDateTime((LocalDateTime) val);
+            packDateTime((LocalDateTime) val);
+            
+            return;
         }
-
+        
         if (val instanceof Instant) {
-            return packTimestamp((Instant) val);
+            packTimestamp((Instant) val);
+            
+            return;
         }
-
-        throw new UnsupportedOperationException("Unsupported type, can't serialize: " + val.getClass());
+        
+        throw new UnsupportedOperationException(
+                "Unsupported type, can't serialize: " + val.getClass());
     }
-
+    
     /**
      * Packs an array of different objects.
      *
      * @param args Object array.
-     * @return This instance.
      * @throws UnsupportedOperationException in case of unknown type.
      */
-    public ClientMessagePacker packObjectArray(Object[] args) {
+    public void packObjectArray(Object[] args) {
         assert !closed : "Packer is closed";
-
+        
         if (args == null) {
             packNil();
-
-            return this;
+            
+            return;
         }
-
+        
         packArrayHeader(args.length);
-
+        
         for (Object arg : args) {
             if (arg == null) {
                 packNil();
-
+                

Review comment:
       Maybe you should tweak your IDE a bit to remove trailing whitespaces :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org