You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/02/04 09:52:56 UTC
[ignite-3] branch main updated: IGNITE-16260 User object serialization performance optimization
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6f171d6 IGNITE-16260 User object serialization performance optimization
6f171d6 is described below
commit 6f171d6a197bf26e7aad9f8dcf7a1103096f1190
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Tue Feb 1 15:36:35 2022 +0400
IGNITE-16260 User object serialization performance optimization
Co-authored-by: Semyon Danilov <sa...@yandex.ru>
---
.../ignite/internal/util/FastTimestamps.java | 65 ++
.../ignite/internal/util/StringIntrospection.java | 139 ++++
.../ignite/internal/util/io/IgniteDataInput.java | 309 ++++++++
.../ignite/internal/util/io/IgniteDataOutput.java | 138 ++++
.../internal/util/io/IgniteUnsafeDataInput.java | 800 +++++++++++++++++++++
.../internal/util/io/IgniteUnsafeDataOutput.java | 702 ++++++++++++++++++
.../apache/ignite/internal/util/io}/VarInts.java | 12 +-
.../internal/util/StringIntrospectionTest.java | 63 ++
.../ignite/internal/util/io/IgniteTestIoUtils.java | 148 ++++
.../IgniteUnsafeDataInputOutputByteOrderTest.java | 240 +++++++
.../util/io/IgniteUnsafeDataInputTest.java} | 37 +-
.../io/IgniteUnsafeDataOutputArraySizingTest.java | 123 ++++
.../ignite/internal/util/io}/VarIntsTest.java | 2 +-
modules/network/pom.xml | 29 +
.../stream/DirectByteBufferStreamImplV1.java | 2 +-
.../network/serialization/BuiltInType.java | 5 +-
.../network/serialization/BuiltInTypeIds.java | 9 +
.../network/serialization/ClassDescriptor.java | 43 +-
.../serialization/ClassDescriptorRegistry.java | 2 +
.../internal/network/serialization/Classes.java | 9 +-
.../network/serialization/FieldDescriptor.java | 4 +-
.../PerSessionSerializationService.java | 11 +-
.../marshal/BuiltInContainerMarshallers.java | 24 +-
.../serialization/marshal/BuiltInMarshalling.java | 209 +++---
.../marshal/BuiltInNonContainerMarshallers.java | 51 +-
.../marshal/DefaultFieldsReaderWriter.java | 11 +-
.../marshal/DefaultUserObjectMarshaller.java | 101 ++-
.../marshal/ExternalizableMarshaller.java | 10 +-
.../marshal/{Bits.java => LittleEndianBits.java} | 67 +-
.../serialization/marshal/LocalDescriptors.java | 5 +
.../serialization/marshal/MarshalledObject.java | 28 +-
.../serialization/marshal/MarshallingContext.java | 44 +-
.../marshal/MarshallingValidations.java | 61 +-
.../serialization/marshal/ProtocolMarshalling.java | 1 +
.../serialization/marshal/ProxyMarshaller.java | 12 +-
.../marshal/StructuredObjectMarshaller.java | 63 +-
.../serialization/marshal/TypedValueReader.java | 8 +-
.../serialization/marshal/TypedValueWriter.java | 8 +-
.../marshal/UnmarshallingContext.java | 17 +-
...ValueWriter.java => UosIgniteOutputStream.java} | 34 +-
.../marshal/UosObjectInputStream.java | 25 +-
.../marshal/UosObjectOutputStream.java | 20 +-
.../network/serialization/marshal/ValueReader.java | 8 +-
.../network/serialization/marshal/ValueWriter.java | 8 +-
.../ignite/internal/network/AllTypesMessage.java | 3 +-
.../internal/network/AllTypesMessageGenerator.java | 22 +-
.../network/SerializationMicroBenchmark.java | 307 ++++++++
.../ignite/internal/network/UosProfilerTarget.java | 97 +++
.../serialization/BuiltInDescriptorsTest.java | 2 +
.../network/serialization/ClassesTest.java | 10 +
.../network/serialization/MarshallableTest.java | 4 +-
.../DefaultUserObjectMarshallerCommonTest.java | 74 ++
...rConcreteTypesKnownUpfrontOptimizationTest.java | 53 +-
...erObjectMarshallerWithArbitraryObjectsTest.java | 41 +-
...efaultUserObjectMarshallerWithBuiltinsTest.java | 80 ++-
...UserObjectMarshallerWithExternalizableTest.java | 9 +-
...shallerWithSerializableOverrideStreamsTest.java | 45 +-
...ltUserObjectMarshallerWithSerializableTest.java | 6 +-
parent/pom.xml | 8 +
59 files changed, 3993 insertions(+), 475 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
new file mode 100644
index 0000000..af6f005
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * Provides access to fast (low-latency), but coarse-grained timestamps.
+ */
+public class FastTimestamps {
+ private static volatile long coarseCurrentTimeMillis = System.currentTimeMillis();
+
+ private static final long UPDATE_FREQUENCY_MS = 10;
+
+ static {
+ startUpdater();
+ }
+
+ private static void startUpdater() {
+ Thread updater = new Thread("FastTimestamps updater") {
+ @Override
+ public void run() {
+ while (true) {
+ coarseCurrentTimeMillis = System.currentTimeMillis();
+ try {
+ Thread.sleep(UPDATE_FREQUENCY_MS);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ };
+
+ updater.setDaemon(true);
+ updater.setPriority(10);
+ updater.start();
+ }
+
+ /**
+ * Returns number of milliseconds passed since Unix Epoch (1970-01-01) with a coarse resolution.
+ * The resolution is currently 10ms. This method works a lot faster (2 ns vs 11000 ns on a developer machine)
+ * than {@link System#currentTimeMillis()}.
+ *
+ * @return number of milliseconds passed since Unix Epoch (1970-01-01) with a coarse resolution
+ */
+ public static long coarseCurrentTimeMillis() {
+ return coarseCurrentTimeMillis;
+ }
+
+ private FastTimestamps() {
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StringIntrospection.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StringIntrospection.java
new file mode 100644
index 0000000..66df530
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StringIntrospection.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+
+/**
+ * Utils for introspecting a String efficiently.
+ */
+public class StringIntrospection {
+ private static final boolean USE_UNSAFE_TO_GET_LATIN1_BYTES;
+ private static final long STRING_CODER_FIELD_OFFSET;
+ private static final long STRING_VALUE_FIELD_OFFSET;
+
+ private static final byte LATIN1 = 0;
+
+ private static final long NO_OFFSET = Long.MIN_VALUE;
+
+ static {
+ Optional<Boolean> maybeCompactStrings = compactStrings();
+ Optional<Long> maybeCoderFieldOffset = coderFieldOffset();
+
+ USE_UNSAFE_TO_GET_LATIN1_BYTES = maybeCompactStrings.isPresent() && maybeCoderFieldOffset.isPresent();
+ STRING_CODER_FIELD_OFFSET = maybeCoderFieldOffset.orElse(NO_OFFSET);
+
+ Optional<Long> maybeValueFieldOffset = byteValueFieldOffset();
+ STRING_VALUE_FIELD_OFFSET = maybeValueFieldOffset.orElse(NO_OFFSET);
+ }
+
+ private static Optional<Boolean> compactStrings() {
+ return compactStringsField()
+ .map(field -> {
+ try {
+ return (Boolean) field.get(null);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Should not be thrown", e);
+ }
+ });
+ }
+
+ private static Optional<Field> compactStringsField() {
+ return stringField("COMPACT_STRINGS")
+ .map(field -> {
+ field.setAccessible(true);
+ return field;
+ });
+ }
+
+ private static Optional<Field> stringField(String name) {
+ try {
+ return Optional.of(String.class.getDeclaredField(name));
+ } catch (NoSuchFieldException e) {
+ return Optional.empty();
+ }
+ }
+
+ private static Optional<Long> coderFieldOffset() {
+ return stringField("coder").map(GridUnsafe::objectFieldOffset);
+ }
+
+ private static Optional<Long> byteValueFieldOffset() {
+ return stringField("value")
+ .filter(field -> field.getType() == byte[].class)
+ .map(GridUnsafe::objectFieldOffset);
+ }
+
+ /**
+ * Returns {@code true} if the current String is represented as Latin1 internally AND we can get access to that
+ * representation fast.
+ *
+ * @param str the string to check
+ * @return {@code true} if the current String is represented as Latin1 internally AND we can get access to that
+ * representation fast
+ */
+ public static boolean supportsFastGetLatin1Bytes(String str) {
+ if (!USE_UNSAFE_TO_GET_LATIN1_BYTES) {
+ return false;
+ }
+ return GridUnsafe.getByteField(str, STRING_CODER_FIELD_OFFSET) == LATIN1;
+ }
+
+ /**
+ * Returns a byte array with ASCII representation of the string. This *ONLY* returns a meaningful result
+ * if each string character fits ASCII encoding (that is, its code is less than 128)!
+ * This may return the internal buffer of the string, so *THE ARRAY CONTENTS SHOULD NEVER BE MODIFIED!*.
+ * The method is 'fast' because in the current Hotspot JVMs (versions 9+) it avoids copying string bytes (as well
+ * as encoding/decoding), it just returns the internal String buffer.
+ *
+ * @param str string to work with
+ * @return byte represenation of an ASCII string
+ */
+ public static byte[] fastAsciiBytes(String str) {
+ if (STRING_VALUE_FIELD_OFFSET != NO_OFFSET) {
+ return (byte[]) GridUnsafe.getObjectField(str, STRING_VALUE_FIELD_OFFSET);
+ } else {
+ // Fallback: something is different, let's not fail here, just pay a performance penalty.
+ return str.getBytes(StandardCharsets.US_ASCII);
+ }
+ }
+
+ /**
+ * Returns a byte array with Latin1 representation of the string. This *ONLY* returns a meaningful result
+ * if each string character fits Latin1 encoding!
+ * This may return the internal buffer of the string, so *THE ARRAY CONTENTS SHOULD NEVER BE MODIFIED!*.
+ * The method is 'fast' because in the current Hotspot JVMs (versions 9+) it avoids copying string bytes (as well
+ * as encoding/decoding), it just returns the internal String buffer.
+ *
+ * @param str string to work with
+ * @return byte represenation of a Latin1 string
+ */
+ public static byte[] fastLatin1Bytes(String str) {
+ if (STRING_VALUE_FIELD_OFFSET != NO_OFFSET) {
+ return (byte[]) GridUnsafe.getObjectField(str, STRING_VALUE_FIELD_OFFSET);
+ } else {
+ // Fallback: something is different, let's not fail here, just pay performance penalty.
+ return str.getBytes(StandardCharsets.ISO_8859_1);
+ }
+ }
+
+ private StringIntrospection() {
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
new file mode 100644
index 0000000..8306f66
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataInput.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Extended data input.
+ */
+public interface IgniteDataInput extends DataInput {
+ /**
+ * Sets the internal buffer.
+ *
+ * @param bytes Bytes.
+ * @param len Length.
+ */
+ void bytes(byte[] bytes, int len);
+
+ /**
+ * Sets the {@link InputStream} from which to pull data.
+ *
+ * @param in Underlying input stream.
+ * @throws IOException In case of error.
+ */
+ void inputStream(InputStream in) throws IOException;
+
+ /**
+ * Resets data input to the position remembered with {@link #mark(int)}.
+ *
+ * @throws IOException In case of error.
+ */
+ void reset() throws IOException;
+
+ /**
+ * Makes the data input ready for reuse.
+ *
+ * @throws IOException if something goes wrong
+ */
+ void cleanup() throws IOException;
+
+ /**
+ * Reads one byte and returns it or a negative value if the end of stream is reached.
+ *
+ * @return The next byte of data, or {@code -1} if the end of the stream is reached.
+ * @exception IOException In case of error.
+ */
+ int read() throws IOException;
+
+ /**
+ * Reads data to fill (probably, partly) the given array.
+ *
+ * @param b Buffer into which the data is read.
+ * @return Total number of bytes read into the buffer, or {@code -1} is there is no
+ * more data because the end of the stream has been reached.
+ * @exception IOException In case of error.
+ */
+ int read(byte[] b) throws IOException;
+
+ /**
+ * Reads data to fill (probably, partly) the given array region.
+ *
+ * @param b Buffer into which the data is read.
+ * @param off Start offset.
+ * @param len Maximum number of bytes to read.
+ * @return Total number of bytes read into the buffer, or {@code -1} is there is no
+ * more data because the end of the stream has been reached.
+ * @exception IOException In case of error.
+ */
+ int read(byte[] b, int off, int len) throws IOException;
+
+ /**
+ * Reads array of {@code byte}s.
+ *
+ * @return Array.
+ * @throws IOException In case of error.
+ */
+ byte[] readByteArray(int length) throws IOException;
+
+ /**
+ * Reads array of {@code short}s.
+ *
+ * @return Array.
+ * @throws IOException In case of error.
+ */
+ short[] readShortArray(int length) throws IOException;
+
+ /**
+ * Reads array of {@code int}s.
+ *
+ * @return Array.
+ * @throws IOException In case of error.
+ */
+ int[] readIntArray(int length) throws IOException;
+
+ /**
+ * Reads array of {@code long}s.
+ *
+ * @return Array.
+ * @throws IOException In case of error.
+ */
+ long[] readLongArray(int length) throws IOException;
+
+ /**
+ * Reads array of {@code float}s.
+ *
+ * @return Array.
+ * @throws IOException In case of error.
+ */
+ float[] readFloatArray(int length) throws IOException;
+
+ /**
+ * Reads array of {@code double}s.
+ *
+ * @return Array.
+ * @throws IOException In case of error.
+ */
+ double[] readDoubleArray(int length) throws IOException;
+
+ /**
+ * Reads array of {@code boolean}s.
+ *
+ * @return Array.
+ * @throws IOException In case of error.
+ */
+ boolean[] readBooleanArray(int length) throws IOException;
+
+ /**
+ * Reads array of {@code char}s.
+ *
+ * @return Array.
+ * @throws IOException In case of error.
+ */
+ char[] readCharArray(int length) throws IOException;
+
+ /**
+ * Reads the requested number of bytes from the input stream into the given
+ * byte array. This method blocks until {@code len} bytes of input data have
+ * been read, end of stream is detected, or an exception is thrown. The
+ * number of bytes actually read, possibly zero, is returned. This method
+ * does not close the input stream.
+ *
+ * <p>In the case where end of stream is reached before {@code len} bytes
+ * have been read, then the actual number of bytes read will be returned.
+ * When this stream reaches end of stream, further invocations of this
+ * method will return zero.
+ *
+ * <p>If {@code len} is zero, then no bytes are read and {@code 0} is
+ * returned; otherwise, there is an attempt to read up to {@code len} bytes.
+ *
+ * <p>The first byte read is stored into element {@code b[off]}, the next
+ * one in to {@code b[off+1]}, and so on. The number of bytes read is, at
+ * most, equal to {@code len}. Let <i>k</i> be the number of bytes actually
+ * read; these bytes will be stored in elements {@code b[off]} through
+ * {@code b[off+}<i>k</i>{@code -1]}, leaving elements {@code b[off+}<i>k</i>
+ * {@code ]} through {@code b[off+len-1]} unaffected.
+ *
+ * <p>The behavior for the case where the input stream is <i>asynchronously
+ * closed</i>, or the thread interrupted during the read, is highly input
+ * stream specific, and therefore not specified.
+ *
+ * <p>If an I/O error occurs reading from the input stream, then it may do
+ * so after some, but not all, bytes of {@code b} have been updated with
+ * data from the input stream. Consequently the input stream and {@code b}
+ * may be in an inconsistent state. It is strongly recommended that the
+ * stream be promptly closed if an I/O error occurs.
+ *
+ * @param b the byte array into which the data is read
+ * @param off the start offset in {@code b} at which the data is written
+ * @param len the maximum number of bytes to read
+ * @return the actual number of bytes read into the buffer
+ * @throws IOException if an I/O error occurs
+ * @throws NullPointerException if {@code b} is {@code null}
+ * @throws IndexOutOfBoundsException If {@code off} is negative, {@code len}
+ * is negative, or {@code len} is greater than {@code b.length - off}
+ */
+ int readFewBytes(byte[] b, int off, int len) throws IOException;
+
+ /**
+ * Reads all remaining bytes from the input stream. This method blocks until
+ * all remaining bytes have been read and end of stream is detected, or an
+ * exception is thrown. This method does not close the input stream.
+ *
+ * <p>When this stream reaches end of stream, further invocations of this
+ * method will return an empty byte array.
+ *
+ * <p>Note that this method is intended for simple cases where it is
+ * convenient to read all bytes into a byte array. It is not intended for
+ * reading input streams with large amounts of data.
+ *
+ * <p>The behavior for the case where the input stream is <i>asynchronously
+ * closed</i>, or the thread interrupted during the read, is highly input
+ * stream specific, and therefore not specified.
+ *
+ * <p>If an I/O error occurs reading from the input stream, then it may do
+ * so after some, but not all, bytes have been read. Consequently the input
+ * stream may not be at end of stream and may be in an inconsistent state.
+ * It is strongly recommended that the stream be promptly closed if an I/O
+ * error occurs.
+ *
+ * @return a byte array containing the bytes read from this input stream
+ * @throws IOException if an I/O error occurs
+ * @throws OutOfMemoryError if an array of the required size cannot be
+ * allocated.
+ */
+ byte[] readAllBytes() throws IOException;
+
+ /**
+ * Marks the current position in this input stream. A subsequent call to
+ * the <code>reset</code> method repositions this stream at the last marked
+ * position so that subsequent reads re-read the same bytes.
+ *
+ * <p>The <code>readlimit</code> arguments tells this input stream to
+ * allow that many bytes to be read before the mark position gets
+ * invalidated.
+ *
+ * <p>The general contract of <code>mark</code> is that, if the method
+ * <code>markSupported</code> returns <code>true</code>, the stream somehow
+ * remembers all the bytes read after the call to <code>mark</code> and
+ * stands ready to supply those same bytes again if and whenever the method
+ * <code>reset</code> is called. However, the stream is not required to
+ * remember any data at all if more than <code>readlimit</code> bytes are
+ * read from the stream before <code>reset</code> is called.
+ *
+ * <p>Marking a closed stream should not have any effect on the stream.
+ *
+ * <p>The <code>mark</code> method of <code>InputStream</code> does
+ * nothing.
+ *
+ * @param readLimit the maximum limit of bytes that can be read before
+ * the mark position becomes invalid.
+ * @see java.io.InputStream#reset()
+ */
+ void mark(int readLimit);
+
+ /**
+ * Returns an estimate of the number of bytes that can be read (or skipped
+ * over) from this input stream without blocking, which may be 0, or 0 when
+ * end of stream is detected. The read might be on the same thread or
+ * another thread. A single read or skip of this many bytes will not block,
+ * but may read or skip fewer bytes.
+ *
+ * <p>Note that while some implementations of {@code InputStream} will
+ * return the total number of bytes in the stream, many will not. It is
+ * never correct to use the return value of this method to allocate
+ * a buffer intended to hold all data in this stream.
+ *
+ * <p>A subclass's implementation of this method may choose to throw an
+ * {@link IOException} if this input stream has been closed by invoking the
+ * {@link #close()} method.
+ *
+ * <p>The {@code available} method of {@code InputStream} always returns
+ * {@code 0}.
+ *
+ * <p>This method should be overridden by subclasses.
+ *
+ * @return an estimate of the number of bytes that can be read (or
+ * skipped over) from this input stream without blocking or
+ * {@code 0} when it reaches the end of the input stream.
+ * @exception IOException if an I/O error occurs.
+ */
+ int available() throws IOException;
+
+ /**
+ * Materializes an object from next bytes (the count is known upfront). The bytes used for materialization are consumed.
+ * This method is useful to avoid an allocation when first reading bytes to a byte array and then converting them
+ * to an object.
+ *
+ * @param bytesCount number of bytes to consume and use for materialization
+ * @param materializer materializer to turn the bytes to an object
+ * @param <T> materialized object type
+ * @return the materialized object
+ * @throws IOException if an I/O error occurs
+ */
+ <T> T materializeFromNextBytes(int bytesCount, Materializer<? extends T> materializer) throws IOException;
+
+ /**
+ * Materializer used to turn a region of a byte array to an object.
+ *
+ * @param <T> the type of a materialized object
+ */
+ interface Materializer<T> {
+ /**
+ * Materializes an object from a region of the given byte array.
+ *
+ * @param buffer byte array
+ * @param offset offset to that array where the bytes representing the object to be materialized start
+ * @param length length (in bytes) of the region to use for materialization
+ * @return the materialized object
+ */
+ T materialize(byte[] buffer, int offset, int length);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
new file mode 100644
index 0000000..5012cde
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteDataOutput.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Extended data output.
+ */
+public interface IgniteDataOutput extends DataOutput {
+ /**
+ * Sets the {@link OutputStream} to push data to.
+ *
+ * @param out Underlying stream.
+ */
+ void outputStream(OutputStream out);
+
+ /**
+ * Returns a copy of the internal array.
+ *
+ * @return Copy of internal array shrunk to offset.
+ */
+ byte[] array();
+
+ /**
+ * Returns the internal array.
+ *
+ * @return Internal array.
+ */
+ byte[] internalArray();
+
+ /**
+ * Returns the current offset in the internal array.
+ *
+ * @return Offset.
+ */
+ int offset();
+
+ /**
+ * Adjusts the offset to the internal array.
+ *
+ * @param off Offset.
+ */
+ void offset(int off);
+
+ /**
+ * Makes the data output ready for reuse.
+ */
+ void cleanup();
+
+ /**
+ * Writes array of {@code byte}s.
+ *
+ * @param arr Array.
+ * @throws IOException In case of error.
+ */
+ void writeByteArray(byte[] arr) throws IOException;
+
+ /**
+ * Writes array of {@code short}s.
+ *
+ * @param arr Array.
+ * @throws IOException In case of error.
+ */
+ void writeShortArray(short[] arr) throws IOException;
+
+ /**
+ * Writes array of {@code int}s.
+ *
+ * @param arr Array.
+ * @throws IOException In case of error.
+ */
+ void writeIntArray(int[] arr) throws IOException;
+
+ /**
+ * Writes array of {@code long}s.
+ *
+ * @param arr Array.
+ * @throws IOException In case of error.
+ */
+ void writeLongArray(long[] arr) throws IOException;
+
+ /**
+ * Writes array of {@code float}s.
+ *
+ * @param arr Array.
+ * @throws IOException In case of error.
+ */
+ void writeFloatArray(float[] arr) throws IOException;
+
+ /**
+ * Writes array of {@code double}s.
+ *
+ * @param arr Array.
+ * @throws IOException In case of error.
+ */
+ void writeDoubleArray(double[] arr) throws IOException;
+
+ /**
+ * Writes array of {@code boolean}s.
+ *
+ * @param arr Array.
+ * @throws IOException In case of error.
+ */
+ void writeBooleanArray(boolean[] arr) throws IOException;
+
+ /**
+ * Writes array of {@code char}s.
+ *
+ * @param arr Array.
+ * @throws IOException In case of error.
+ */
+ void writeCharArray(char[] arr) throws IOException;
+
+ /**
+ * Flushes the output. This flushes the interlying {@link OutputStream} (if exists).
+ *
+ * @throws IOException if something went wrong
+ */
+ void flush() throws IOException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
new file mode 100644
index 0000000..8c11bba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInput.java
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UTFDataFormatException;
+import java.util.Objects;
+import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
+import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.util.FastTimestamps;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/**
+ * Data input based on {@code Unsafe} operations.
+ */
+public class IgniteUnsafeDataInput extends InputStream implements IgniteDataInput {
+ /** Maximum data block length. */
+ private static final int MAX_BLOCK_SIZE = 1024;
+
+ /** Length of char buffer (for reading strings). */
+ private static final int CHAR_BUF_SIZE = 256;
+
+ private static final long NO_AUTO_SHRINK = -1;
+
+ private final long shrinkCheckFrequencyMs;
+
+ /** Buffer for reading general/block data. */
+ @IgniteToStringExclude
+ private final byte[] utfBuf = new byte[MAX_BLOCK_SIZE];
+
+ /** Char buffer for fast string reads. */
+ @IgniteToStringExclude
+ private final char[] utfCharBuf = new char[CHAR_BUF_SIZE];
+
+ /** Current offset into buf. */
+ private int pos;
+
+ /** End offset of valid data in buf, or -1 if no more block data. */
+ private int end = -1;
+
+ /** Bytes. */
+ @IgniteToStringExclude
+ private byte[] buf;
+
+ /** Offset. */
+ private int off;
+
+ /** Max. */
+ private int max;
+
+ /** Underlying input stream. */
+ @IgniteToStringExclude
+ private InputStream in;
+
+ /** Buffer for reading from stream. */
+ @IgniteToStringExclude
+ private byte[] inBuf = new byte[1024];
+
+ /** Maximum message size. */
+ private int maxOff;
+
+ private int mark;
+
+ /** Last length check timestamp. */
+ private long lastAutoShrinkCheckTimestamp = FastTimestamps.coarseCurrentTimeMillis();
+
+ /**
+ * Creates a new input with auto-shrinking disabled and no internal buffer assigned.
+ */
+ public IgniteUnsafeDataInput() {
+ this(NO_AUTO_SHRINK);
+ }
+
+ /**
+ * Creates a new input with no internal buffer assigned.
+ *
+ * @param shrinkCheckFrequencyMs how often to check whether an underlying byte buffer needs to be shrunk
+ * (disables the auto-shrinking if it's -1)
+ */
+ public IgniteUnsafeDataInput(long shrinkCheckFrequencyMs) {
+ this.shrinkCheckFrequencyMs = shrinkCheckFrequencyMs;
+ }
+
+ /**
+ * Creates a new input with auto-shrinking disabled.
+ *
+ * @param bytes array to initially (before automatic resize) use as an internal buffer
+ */
+ public IgniteUnsafeDataInput(byte[] bytes) {
+ this(bytes, NO_AUTO_SHRINK);
+ }
+
+ /**
+ * Creates a new input.
+ *
+ * @param bytes array to initially (before automatic resize) use as an internal buffer
+ * @param shrinkCheckFrequencyMs how often to check whether an underlying byte buffer needs to be shrunk
+ * (disables the auto-shrinking if it's -1)
+ */
+ public IgniteUnsafeDataInput(byte[] bytes, long shrinkCheckFrequencyMs) {
+ this(shrinkCheckFrequencyMs);
+
+ bytes(bytes, bytes.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void bytes(byte[] bytes, int len) {
+ bytes(bytes, 0, len);
+ }
+
+ /**
+ * Sets the internal buffer.
+ *
+ * @param bytes Bytes.
+ * @param off Offset.
+ * @param len Length.
+ */
+ public void bytes(byte[] bytes, int off, int len) {
+ buf = bytes;
+
+ max = len;
+ this.off = off;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void inputStream(InputStream in) throws IOException {
+ this.in = in;
+
+ buf = inBuf;
+ }
+
+ /**
+ * Reads from stream to buffer. If stream is {@code null}, this method is no-op.
+ *
+ * @param size Number of bytes to read.
+ * @throws IOException In case of error.
+ */
+ private void fromStream(int size) throws IOException {
+ if (in == null) {
+ return;
+ }
+
+ maxOff = Math.max(maxOff, size);
+
+ // Increase size of buffer if needed.
+ if (size > inBuf.length) {
+ buf = inBuf = new byte[Math.max(inBuf.length << 1, size)]; // Grow.
+ } else if (isAutoShrinkEnabled()) {
+ long now = FastTimestamps.coarseCurrentTimeMillis();
+
+ if (now - lastAutoShrinkCheckTimestamp > shrinkCheckFrequencyMs) {
+ int halfSize = inBuf.length >> 1;
+
+ if (maxOff < halfSize) {
+ byte[] newInBuf = new byte[halfSize]; // Shrink.
+
+ System.arraycopy(inBuf, 0, newInBuf, 0, off);
+
+ buf = inBuf = newInBuf;
+ }
+
+ maxOff = 0;
+ lastAutoShrinkCheckTimestamp = now;
+ }
+ }
+
+ off = 0;
+ max = 0;
+
+ while (max != size) {
+ int read = in.read(inBuf, max, size - max);
+
+ if (read == -1) {
+ throw new EOFException("End of stream reached: " + in);
+ }
+
+ max += read;
+ }
+ }
+
+ private boolean isAutoShrinkEnabled() {
+ return shrinkCheckFrequencyMs != NO_AUTO_SHRINK;
+ }
+
+ /**
+ * Advances the offset doing a check for whether we fell off the buffer edge.
+ *
+ * @param more Bytes to move forward.
+ * @return Old offset value.
+ * @throws IOException In case of error.
+ */
+ private int advanceOffset(int more) throws IOException {
+ int old = off;
+
+ off += more;
+
+ if (off > max) {
+ throw new EOFException("Attempt to read beyond the end of the stream "
+ + "[pos=" + off + ", more=" + more + ", max=" + max + ']');
+ }
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
+ @Override
+ public void reset() throws IOException {
+ if (in != null) {
+ in.reset();
+ } else {
+ off = mark;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void cleanup() throws IOException {
+ in = null;
+
+ off = 0;
+ max = 0;
+ mark = 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte[] readByteArray(int arrSize) throws IOException {
+ fromStream(arrSize);
+
+ byte[] arr = new byte[arrSize];
+
+ System.arraycopy(buf, advanceOffset(arrSize), arr, 0, arrSize);
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public short[] readShortArray(int arrSize) throws IOException {
+ int bytesToCp = arrSize << 1;
+
+ fromStream(bytesToCp);
+
+ short[] arr = new short[arrSize];
+
+ long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ for (int i = 0; i < arr.length; i++) {
+ arr[i] = GridUnsafe.getShortLittleEndian(buf, off);
+
+ off += 2;
+ }
+ } else {
+ GridUnsafe.copyMemory(buf, off, arr, SHORT_ARR_OFF, bytesToCp);
+ }
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int[] readIntArray(int arrSize) throws IOException {
+ int bytesToCp = arrSize << 2;
+
+ fromStream(bytesToCp);
+
+ int[] arr = new int[arrSize];
+
+ long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ for (int i = 0; i < arr.length; i++) {
+ arr[i] = GridUnsafe.getIntLittleEndian(buf, off);
+
+ off += 4;
+ }
+ } else {
+ GridUnsafe.copyMemory(buf, off, arr, INT_ARR_OFF, bytesToCp);
+ }
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public double[] readDoubleArray(int arrSize) throws IOException {
+ int bytesToCp = arrSize << 3;
+
+ fromStream(bytesToCp);
+
+ double[] arr = new double[arrSize];
+
+ long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ for (int i = 0; i < arr.length; i++) {
+ arr[i] = GridUnsafe.getDoubleLittleEndian(buf, off);
+
+ off += 8;
+ }
+ } else {
+ GridUnsafe.copyMemory(buf, off, arr, DOUBLE_ARR_OFF, bytesToCp);
+ }
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean[] readBooleanArray(int arrSize) throws IOException {
+ boolean[] vals = new boolean[arrSize];
+
+ for (int i = 0; i < arrSize; i++) {
+ vals[i] = readBoolean();
+ }
+
+ return vals;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public char[] readCharArray(int arrSize) throws IOException {
+ int bytesToCp = arrSize << 1;
+
+ fromStream(bytesToCp);
+
+ char[] arr = new char[arrSize];
+
+ long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ for (int i = 0; i < arr.length; i++) {
+ arr[i] = GridUnsafe.getCharLittleEndian(buf, off);
+
+ off += 2;
+ }
+ } else {
+ GridUnsafe.copyMemory(buf, off, arr, CHAR_ARR_OFF, bytesToCp);
+ }
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long[] readLongArray(int arrSize) throws IOException {
+ int bytesToCp = arrSize << 3;
+
+ fromStream(bytesToCp);
+
+ long[] arr = new long[arrSize];
+
+ long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ for (int i = 0; i < arr.length; i++) {
+ arr[i] = GridUnsafe.getLongLittleEndian(buf, off);
+
+ off += 8;
+ }
+ } else {
+ GridUnsafe.copyMemory(buf, off, arr, LONG_ARR_OFF, bytesToCp);
+ }
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float[] readFloatArray(int arrSize) throws IOException {
+ int bytesToCp = arrSize << 2;
+
+ fromStream(bytesToCp);
+
+ float[] arr = new float[arrSize];
+
+ long off = BYTE_ARR_OFF + advanceOffset(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ for (int i = 0; i < arr.length; i++) {
+ arr[i] = GridUnsafe.getFloatLittleEndian(buf, off);
+
+ off += 4;
+ }
+ } else {
+ GridUnsafe.copyMemory(buf, off, arr, FLOAT_ARR_OFF, bytesToCp);
+ }
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int readFewBytes(byte[] b, int off, int len) throws IOException {
+ Objects.checkFromIndexSize(off, len, b.length);
+
+ int n = 0;
+ while (n < len) {
+ int count = read(b, off + n, len - n);
+ if (count < 0) {
+ break;
+ }
+ n += count;
+ }
+ return n;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ int len = b.length;
+
+ fromStream(len);
+
+ System.arraycopy(buf, advanceOffset(len), b, 0, len);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ fromStream(len);
+
+ System.arraycopy(buf, advanceOffset(len), b, off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int skipBytes(int n) {
+ if (off + n > max) {
+ n = max - off;
+ }
+
+ off += n;
+
+ return n;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean readBoolean() throws IOException {
+ fromStream(1);
+
+ return GridUnsafe.getBoolean(buf, BYTE_ARR_OFF + advanceOffset(1));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte readByte() throws IOException {
+ fromStream(1);
+
+ return GridUnsafe.getByte(buf, BYTE_ARR_OFF + advanceOffset(1));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int readUnsignedByte() throws IOException {
+ return readByte() & 0xff;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public short readShort() throws IOException {
+ fromStream(2);
+
+ long off = BYTE_ARR_OFF + advanceOffset(2);
+
+ return BIG_ENDIAN ? GridUnsafe.getShortLittleEndian(buf, off) : GridUnsafe.getShort(buf, off);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int readUnsignedShort() throws IOException {
+ return readShort() & 0xffff;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public char readChar() throws IOException {
+ fromStream(2);
+
+ long off = BYTE_ARR_OFF + this.off;
+
+ char v = BIG_ENDIAN ? GridUnsafe.getCharLittleEndian(buf, off) : GridUnsafe.getChar(buf, off);
+
+ advanceOffset(2);
+
+ return v;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int readInt() throws IOException {
+ fromStream(4);
+
+ long off = BYTE_ARR_OFF + advanceOffset(4);
+
+ return BIG_ENDIAN ? GridUnsafe.getIntLittleEndian(buf, off) : GridUnsafe.getInt(buf, off);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long readLong() throws IOException {
+ fromStream(8);
+
+ long off = BYTE_ARR_OFF + advanceOffset(8);
+
+ return BIG_ENDIAN ? GridUnsafe.getLongLittleEndian(buf, off) : GridUnsafe.getLong(buf, off);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float readFloat() throws IOException {
+ int v = readInt();
+
+ return Float.intBitsToFloat(v);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public double readDouble() throws IOException {
+ long v = readLong();
+
+ return Double.longBitsToDouble(v);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int read() throws IOException {
+ try {
+ return readUnsignedByte();
+ } catch (EOFException ignored) {
+ return -1;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ if (len == 0) {
+ return 0;
+ }
+
+ if (in != null) {
+ return in.read(b, off, len);
+ } else {
+ int toRead = Math.min(len, max - this.off);
+ if (toRead <= 0) {
+ return -1;
+ }
+
+ System.arraycopy(buf, advanceOffset(toRead), b, off, toRead);
+
+ return toRead;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String readLine() throws IOException {
+ StringBuilder sb = new StringBuilder();
+
+ int b;
+
+ while ((b = read()) >= 0) {
+ char c = (char) b;
+
+ switch (c) {
+ case '\n':
+ return sb.toString();
+
+ case '\r':
+ b = read();
+
+ if (b < 0 || b == '\n') {
+ return sb.toString();
+ } else {
+ sb.append((char) b);
+ }
+
+ break;
+
+ default:
+ sb.append(c);
+ }
+ }
+
+ return sb.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String readUTF() throws IOException {
+ return readUtfBody(VarInts.readUnsignedInt(this));
+ }
+
+ /**
+ * Reads in the "body" (i.e., the UTF representation minus the 2-byte
+ * or 8-byte length header) of a UTF encoding, which occupies the next
+ * utfLen bytes.
+ *
+ * @param utfLen UTF encoding length.
+ * @return String.
+ * @throws IOException In case of error.
+ */
+ private String readUtfBody(long utfLen) throws IOException {
+ StringBuilder sbuf = new StringBuilder();
+
+ end = pos = 0;
+
+ while (utfLen > 0) {
+ int avail = end - pos;
+
+ if (avail >= 3 || (long) avail == utfLen) {
+ utfLen -= readUtfSpan(sbuf, utfLen);
+ } else {
+ // shift and refill buffer manually
+ if (avail > 0) {
+ System.arraycopy(utfBuf, pos, utfBuf, 0, avail);
+ }
+
+ pos = 0;
+ end = (int) Math.min(MAX_BLOCK_SIZE, utfLen);
+
+ readFully(utfBuf, avail, end - avail);
+ }
+ }
+
+ return sbuf.toString();
+ }
+
+ /**
+ * Reads span of UTF-encoded characters out of internal buffer
+ * (starting at offset pos and ending at or before offset end),
+ * consuming no more than utfLen bytes. Appends read characters to
+ * sbuf. Returns the number of bytes consumed.
+ *
+ * @param sbuf String builder.
+ * @param utfLen UTF encoding length.
+ * @return Number of bytes consumed.
+ * @throws IOException In case of error.
+ */
+ @SuppressWarnings("ThrowFromFinallyBlock")
+ private long readUtfSpan(StringBuilder sbuf, long utfLen) throws IOException {
+ int cpos = 0;
+ int start = pos;
+ int avail = Math.min(end - pos, CHAR_BUF_SIZE);
+ int stop = pos + ((utfLen > avail) ? avail - 2 : (int) utfLen);
+ boolean outOfBounds = false;
+
+ try {
+ while (pos < stop) {
+ int b1 = utfBuf[pos++] & 0xFF;
+
+ int b2;
+ int b3;
+
+ switch (b1 >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ // 1 byte format: 0xxxxxxx
+ utfCharBuf[cpos++] = (char) b1;
+
+ break;
+
+ case 12:
+ case 13:
+ // 2 byte format: 110xxxxx 10xxxxxx
+ b2 = utfBuf[pos++];
+
+ if ((b2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException();
+ }
+
+ utfCharBuf[cpos++] = (char) (((b1 & 0x1F) << 6) | (b2 & 0x3F));
+
+ break;
+
+ case 14:
+ // 3 byte format: 1110xxxx 10xxxxxx 10xxxxxx
+ b3 = utfBuf[pos + 1];
+ b2 = utfBuf[pos];
+
+ pos += 2;
+
+ if ((b2 & 0xC0) != 0x80 || (b3 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException();
+ }
+
+ utfCharBuf[cpos++] = (char) (((b1 & 0x0F) << 12) | ((b2 & 0x3F) << 6) | (b3 & 0x3F));
+
+ break;
+
+ default:
+ // 10xx xxxx, 1111 xxxx
+ throw new UTFDataFormatException();
+ }
+ }
+ } catch (ArrayIndexOutOfBoundsException ignored) {
+ outOfBounds = true;
+ } finally {
+ if (outOfBounds || (pos - start) > utfLen) {
+ pos = start + (int) utfLen;
+
+ throw new UTFDataFormatException();
+ }
+ }
+
+ sbuf.append(utfCharBuf, 0, cpos);
+
+ return pos - start;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void mark(int readLimit) {
+ if (in != null) {
+ in.mark(readLimit);
+ } else {
+ mark = off;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean markSupported() {
+ return in == null || in.markSupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int available() throws IOException {
+ if (in != null) {
+ return in.available();
+ } else {
+ return Math.max(buf.length - off, 0);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> T materializeFromNextBytes(int bytesCount, Materializer<? extends T> materializer) throws IOException {
+ fromStream(bytesCount);
+
+ int prevOffset = advanceOffset(bytesCount);
+
+ return materializer.materialize(buf, prevOffset, bytesCount);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return IgniteToStringBuilder.toString(IgniteUnsafeDataInput.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
new file mode 100644
index 0000000..6753179
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutput.java
@@ -0,0 +1,702 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
+import org.apache.ignite.internal.util.FastTimestamps;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.StringIntrospection;
+
+/**
+ * Data output based on {@code Unsafe} operations.
+ */
+public class IgniteUnsafeDataOutput extends OutputStream implements IgniteDataOutput {
+ /**
+ * Based on ByteArrayOutputStream#MAX_ARRAY_SIZE or many other similar constants in other classes.
+ * It's not safe to allocate more then this number of elements in byte array, because it can throw
+ * java.lang.OutOfMemoryError: Requested array size exceeds VM limit
+ */
+ private static final int MAX_BYTE_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+ /** Length of char buffer (for writing strings). */
+ private static final int CHAR_BUF_SIZE = 256;
+
+ /** Char buffer for fast string writes. */
+ private final char[] cbuf = new char[CHAR_BUF_SIZE];
+
+ private static final long NO_AUTO_SHRINK = -1;
+
+ private final long shrinkCheckFrequencyMs;
+
+ /** Bytes. */
+ private byte[] bytes;
+
+ /** Offset. */
+ private int off;
+
+ /** Underlying output stream. */
+ private OutputStream out;
+
+ /** Maximum message size. */
+ private int maxOff;
+
+ /** Last length check timestamp. */
+ private long lastAutoShrinkCheckTimestamp = FastTimestamps.coarseCurrentTimeMillis();
+
+ /**
+ * Creates a new output with auto-shrinking disabled and no internal buffer allocated.
+ */
+ public IgniteUnsafeDataOutput() {
+ this(NO_AUTO_SHRINK);
+ }
+
+ /**
+ * Creates a new output with no internal buffer allocated.
+ *
+ * @param shrinkCheckFrequencyMs how often to check whether an underlying byte buffer needs to be shrunk
+ * (disables the auto-shrinking if it's -1)
+ */
+ public IgniteUnsafeDataOutput(long shrinkCheckFrequencyMs) {
+ if (shrinkCheckFrequencyMs != NO_AUTO_SHRINK && shrinkCheckFrequencyMs <= 0) {
+ throw new IllegalArgumentException(
+ "Auto-shrink frequency must be either -1 (when auto-shrinking is disabled) or positive, but it's "
+ + shrinkCheckFrequencyMs
+ );
+ }
+
+ this.shrinkCheckFrequencyMs = shrinkCheckFrequencyMs;
+ }
+
+ /**
+ * Creates a new output with auto-shrinking disabled.
+ *
+ * @param size size of an internal buffer to create
+ */
+ public IgniteUnsafeDataOutput(int size) {
+ this(size, NO_AUTO_SHRINK);
+ }
+
+ /**
+ * Creates a new output.
+ *
+ * @param size Size of an internal buffer to create
+ * @param shrinkCheckFrequencyMs how often to check whether an underlying byte buffer needs to be shrunk
+ * (disables the auto-shrinking if it's -1)
+ */
+ public IgniteUnsafeDataOutput(int size, long shrinkCheckFrequencyMs) {
+ this(shrinkCheckFrequencyMs);
+
+ bytes = new byte[size];
+ }
+
+ /**
+ * Sets the internal buffer.
+ *
+ * @param bytes Bytes.
+ * @param off Offset.
+ */
+ public void bytes(byte[] bytes, int off) {
+ this.bytes = bytes;
+ this.off = off;
+ }
+
+ /**
+ * Sets the {@link OutputStream} to which to push data.
+ *
+ * @param out Underlying output stream.
+ */
+ @Override
+ public void outputStream(OutputStream out) {
+ this.out = out;
+
+ off = 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte[] array() {
+ byte[] bytes0 = new byte[off];
+
+ System.arraycopy(bytes, 0, bytes0, 0, off);
+
+ return bytes0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte[] internalArray() {
+ return bytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int offset() {
+ return off;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void offset(int off) {
+ this.off = off;
+ }
+
+ private void requestFreeSize(int size) throws IOException {
+ if (!canBeAllocated(off + size)) {
+ throw new IOException("Failed to allocate required memory (byte array size overflow detected) "
+ + "[length=" + size + ", offset=" + off + ']');
+ }
+
+ size = off + size;
+
+ maxOff = Math.max(maxOff, size);
+
+ if (size > bytes.length) {
+ int newSize = size << 1;
+
+ if (!canBeAllocated(newSize)) {
+ newSize = MAX_BYTE_ARRAY_SIZE;
+ }
+
+ bytes = Arrays.copyOf(bytes, newSize); // Grow.
+ } else if (isAutoShrinkEnabled()) {
+ long now = FastTimestamps.coarseCurrentTimeMillis();
+
+ if (now - lastAutoShrinkCheckTimestamp > shrinkCheckFrequencyMs) {
+ int halfSize = bytes.length >> 1;
+
+ if (maxOff < halfSize) {
+ bytes = Arrays.copyOf(bytes, halfSize); // Shrink.
+ }
+
+ maxOff = 0;
+ lastAutoShrinkCheckTimestamp = now;
+ }
+ }
+ }
+
+ private boolean isAutoShrinkEnabled() {
+ return shrinkCheckFrequencyMs != NO_AUTO_SHRINK;
+ }
+
+ /**
+ * Returns true if {@code new byte[size]} won't throw {@link OutOfMemoryError} given enough heap space.
+ *
+ * @param size Size of potential byte array to check.
+ * @return true if {@code new byte[size]} won't throw {@link OutOfMemoryError} given enough heap space.
+ * @see IgniteUnsafeDataOutput#MAX_BYTE_ARRAY_SIZE
+ */
+ private boolean canBeAllocated(long size) {
+ return 0 <= size && size <= MAX_BYTE_ARRAY_SIZE;
+ }
+
+ /**
+ * If there is an {@link OutputStream}, writes the requested amount of buffered data to it; otherwise,
+ * just advances the offset of the internal buffer.
+ *
+ * @param size Size.
+ * @throws IOException In case of error.
+ */
+ private void onWrite(int size) throws IOException {
+ if (out != null) {
+ out.write(bytes, 0, size);
+ } else {
+ off += size;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(int b) throws IOException {
+ writeByte(b);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(byte[] b) throws IOException {
+ requestFreeSize(b.length);
+
+ System.arraycopy(b, 0, bytes, off, b.length);
+
+ onWrite(b.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ requestFreeSize(len);
+
+ System.arraycopy(b, off, bytes, this.off, len);
+
+ onWrite(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeDoubleArray(double[] arr) throws IOException {
+ checkArrayAllocationOverflow(8, arr.length, "double");
+
+ int bytesToCp = arr.length << 3;
+
+ requestFreeSize(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ long off = BYTE_ARR_OFF + this.off;
+
+ for (double val : arr) {
+ GridUnsafe.putDoubleLittleEndian(bytes, off, val);
+
+ off += 8;
+ }
+ } else {
+ GridUnsafe.copyMemory(arr, DOUBLE_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+ }
+
+ onWrite(bytesToCp);
+ }
+
+ private void putInt(int val, long off) {
+ if (BIG_ENDIAN) {
+ GridUnsafe.putIntLittleEndian(bytes, off, val);
+ } else {
+ GridUnsafe.putInt(bytes, off, val);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override
+ public void writeBooleanArray(boolean[] arr) throws IOException {
+ for (int i = 0; i < arr.length; i++) {
+ writeBoolean(arr[i]);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeCharArray(char[] arr) throws IOException {
+ checkArrayAllocationOverflow(2, arr.length, "char");
+
+ int bytesToCp = arr.length << 1;
+
+ requestFreeSize(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ long off = BYTE_ARR_OFF + this.off;
+
+ for (char val : arr) {
+ GridUnsafe.putCharLittleEndian(bytes, off, val);
+
+ off += 2;
+ }
+ } else {
+ GridUnsafe.copyMemory(arr, CHAR_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+ }
+
+ onWrite(bytesToCp);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeLongArray(long[] arr) throws IOException {
+ checkArrayAllocationOverflow(8, arr.length, "long");
+
+ int bytesToCp = arr.length << 3;
+
+ requestFreeSize(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ long off = BYTE_ARR_OFF + this.off;
+
+ for (long val : arr) {
+ GridUnsafe.putLongLittleEndian(bytes, off, val);
+
+ off += 8;
+ }
+ } else {
+ GridUnsafe.copyMemory(arr, LONG_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+ }
+
+ onWrite(bytesToCp);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeFloatArray(float[] arr) throws IOException {
+ checkArrayAllocationOverflow(4, arr.length, "float");
+
+ int bytesToCp = arr.length << 2;
+
+ requestFreeSize(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ long off = BYTE_ARR_OFF + this.off;
+
+ for (float val : arr) {
+ GridUnsafe.putFloatLittleEndian(bytes, off, val);
+
+ off += 4;
+ }
+ } else {
+ GridUnsafe.copyMemory(arr, FLOAT_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+ }
+
+ onWrite(bytesToCp);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void cleanup() {
+ off = 0;
+
+ out = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeByteArray(byte[] arr) throws IOException {
+ requestFreeSize(arr.length);
+
+ System.arraycopy(arr, 0, bytes, off, arr.length);
+
+ onWrite(arr.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeShortArray(short[] arr) throws IOException {
+ checkArrayAllocationOverflow(2, arr.length, "short");
+
+ int bytesToCp = arr.length << 1;
+
+ requestFreeSize(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ long off = BYTE_ARR_OFF + this.off;
+
+ for (short val : arr) {
+ GridUnsafe.putShortLittleEndian(bytes, off, val);
+
+ off += 2;
+ }
+ } else {
+ GridUnsafe.copyMemory(arr, SHORT_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+ }
+
+ onWrite(bytesToCp);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeIntArray(int[] arr) throws IOException {
+ checkArrayAllocationOverflow(4, arr.length, "int");
+
+ int bytesToCp = arr.length << 2;
+
+ requestFreeSize(bytesToCp);
+
+ if (BIG_ENDIAN) {
+ long off = BYTE_ARR_OFF + this.off;
+
+ for (int val : arr) {
+ GridUnsafe.putIntLittleEndian(bytes, off, val);
+
+ off += 4;
+ }
+ } else {
+ GridUnsafe.copyMemory(arr, INT_ARR_OFF, bytes, BYTE_ARR_OFF + off, bytesToCp);
+ }
+
+ onWrite(bytesToCp);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws IOException {
+ cleanup();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ requestFreeSize(1);
+
+ GridUnsafe.putBoolean(bytes, BYTE_ARR_OFF + off, v);
+
+ onWrite(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeByte(int v) throws IOException {
+ requestFreeSize(1);
+
+ putByte((byte) v, BYTE_ARR_OFF + off);
+
+ onWrite(1);
+ }
+
+ private void putByte(byte val, long off) {
+ GridUnsafe.putByte(bytes, off, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeShort(int v) throws IOException {
+ requestFreeSize(2);
+
+ short val = (short) v;
+
+ long off = BYTE_ARR_OFF + this.off;
+
+ putShort(val, off);
+
+ onWrite(2);
+ }
+
+ private void putShort(short val, long off) {
+ if (BIG_ENDIAN) {
+ GridUnsafe.putShortLittleEndian(bytes, off, val);
+ } else {
+ GridUnsafe.putShort(bytes, off, val);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeChar(int v) throws IOException {
+ requestFreeSize(2);
+
+ char val = (char) v;
+
+ long off = BYTE_ARR_OFF + this.off;
+
+ if (BIG_ENDIAN) {
+ GridUnsafe.putCharLittleEndian(bytes, off, val);
+ } else {
+ GridUnsafe.putChar(bytes, off, val);
+ }
+
+ onWrite(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeInt(int v) throws IOException {
+ requestFreeSize(4);
+
+ long off = BYTE_ARR_OFF + this.off;
+
+ putInt(v, off);
+
+ onWrite(4);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeLong(long v) throws IOException {
+ requestFreeSize(8);
+
+ long off = BYTE_ARR_OFF + this.off;
+
+ if (BIG_ENDIAN) {
+ GridUnsafe.putLongLittleEndian(bytes, off, v);
+ } else {
+ GridUnsafe.putLong(bytes, off, v);
+ }
+
+ onWrite(8);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeFloat(float v) throws IOException {
+ int val = Float.floatToIntBits(v);
+
+ writeInt(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeDouble(double v) throws IOException {
+ long val = Double.doubleToLongBits(v);
+
+ writeLong(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeBytes(String s) throws IOException {
+ int len = s.length();
+
+ if (utfLength(s) == len) {
+ writeAsciiStringBytes(s);
+ } else {
+ for (int i = 0; i < len; i++) {
+ writeByte(s.charAt(i));
+ }
+ }
+ }
+
+ private void writeAsciiStringBytes(String s) throws IOException {
+ writeByteArray(StringIntrospection.fastAsciiBytes(s));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeChars(String s) throws IOException {
+ int len = s.length();
+
+ for (int i = 0; i < len; i++) {
+ writeChar(s.charAt(i));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void writeUTF(String s) throws IOException {
+ writeUtf(s, utfLength(s));
+ }
+
+ /**
+ * Writes the given string in UTF format. This method is used in
+ * situations where the UTF encoding length of the string is already
+ * known; specifying it explicitly avoids a prescan of the string to
+ * determine its UTF length.
+ *
+ * @param s String.
+ * @param utfLen UTF length encoding.
+ * @throws IOException In case of error.
+ */
+ private void writeUtf(String s, int utfLen) throws IOException {
+ VarInts.writeUnsignedInt(utfLen, this);
+
+ if (utfLen == s.length()) {
+ writeAsciiStringBytes(s);
+ } else {
+ writeUtfBody(s);
+ }
+ }
+
+ /**
+ * Check for possible arithmetic overflow when trying to serialize a humongous array.
+ *
+ * @param bytes Number of bytes in a single array element.
+ * @param arrLen Array length.
+ * @param type Type of an array.
+ * @throws IOException If oveflow presents and data corruption can occur.
+ */
+ private void checkArrayAllocationOverflow(int bytes, int arrLen, String type) throws IOException {
+ long bytesToAlloc = (long) arrLen * bytes;
+
+ if (!canBeAllocated(bytesToAlloc)) {
+ throw new IOException("Failed to allocate required memory for " + type + " array "
+ + "(byte array size overflow detected) [length=" + arrLen + ']');
+ }
+ }
+
+ /**
+ * Returns the length in bytes of the UTF encoding of the given string.
+ *
+ * @param s String.
+ * @return UTF encoding length.
+ */
+ private int utfLength(String s) {
+ int len = s.length();
+ int utfLen = 0;
+
+ for (int off = 0; off < len; ) {
+ int size = Math.min(len - off, CHAR_BUF_SIZE);
+
+ s.getChars(off, off + size, cbuf, 0);
+
+ for (int pos = 0; pos < size; pos++) {
+ char c = cbuf[pos];
+
+ if (c >= 0x0001 && c <= 0x007F) {
+ utfLen++;
+ } else {
+ utfLen += c > 0x07FF ? 3 : 2;
+ }
+ }
+
+ off += size;
+ }
+
+ return utfLen;
+ }
+
+ /**
+ * Writes the "body" (i.e., the UTF representation minus the 2-byte or
+ * 8-byte length header) of the UTF encoding for the given string.
+ *
+ * @param s String.
+ * @throws IOException In case of error.
+ */
+ private void writeUtfBody(String s) throws IOException {
+ int len = s.length();
+
+ for (int off = 0; off < len; ) {
+ int csize = Math.min(len - off, CHAR_BUF_SIZE);
+
+ s.getChars(off, off + csize, cbuf, 0);
+
+ for (int cpos = 0; cpos < csize; cpos++) {
+ char c = cbuf[cpos];
+
+ if (c <= 0x007F && c != 0) {
+ write(c);
+ } else if (c > 0x07FF) {
+ write(0xE0 | ((c >> 12) & 0x0F));
+ write(0x80 | ((c >> 6) & 0x3F));
+ write(0x80 | ((c) & 0x3F));
+ } else {
+ write(0xC0 | ((c >> 6) & 0x1F));
+ write(0x80 | ((c) & 0x3F));
+ }
+ }
+
+ off += csize;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void flush() throws IOException {
+ if (out != null) {
+ out.flush();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return IgniteToStringBuilder.toString(IgniteUnsafeDataOutput.class, this);
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/VarInts.java b/modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java
similarity index 86%
rename from modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/VarInts.java
rename to modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java
index f3a70c4..98e7aff 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/VarInts.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/io/VarInts.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.serialization.marshal;
+package org.apache.ignite.internal.util.io;
import java.io.DataInput;
import java.io.DataOutput;
@@ -24,23 +24,23 @@ import java.io.IOException;
/**
* Utils to read/write variable length ints.
*/
-class VarInts {
+public class VarInts {
private VarInts() {
}
/**
- * Writes a unsigned int using variable length format. If it's less than 0xFF, it's written as one byte.
+ * Writes an unsigned int using variable length format. If it's less than 0xFF, it's written as one byte.
* If it's more than 0xFE, but less than 0xFFFF, it's written as 3 bytes: first one byte equal to 0xFF, then 2 bytes
* as {@link DataOutput#writeShort(int)} writes them. Otherwise, it writes 3 0xFF bytes, then writes
* {@link DataOutput#writeInt(int)}.
* This may be beneficial for the cases when we need to write an unsigned int, but most of the time the values
- * are small.
+ * are small (for example, when writing an array/collection/string length).
*
* @param value value to write
* @param output where to write to value to
* @throws IOException if an I/O error occurs
*/
- static void writeUnsignedInt(int value, DataOutput output) throws IOException {
+ public static void writeUnsignedInt(int value, DataOutput output) throws IOException {
if (value < 0) {
throw new IllegalArgumentException(value + " is negative");
}
@@ -65,7 +65,7 @@ class VarInts {
* @throws IOException if an I/O error occurs
* @see #writeUnsignedInt(int, DataOutput)
*/
- static int readUnsignedInt(DataInput input) throws IOException {
+ public static int readUnsignedInt(DataInput input) throws IOException {
int first = input.readUnsignedByte();
if (first < 0xFF) {
return first;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/StringIntrospectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/StringIntrospectionTest.java
new file mode 100644
index 0000000..8e96566
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/StringIntrospectionTest.java
@@ -0,0 +1,63 @@
+package org.apache.ignite.internal.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static java.nio.charset.StandardCharsets.ISO_8859_1;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+class StringIntrospectionTest {
+
+ private static final String ASCII = "ASCII";
+ private static final String NOT_ASCII_BUT_LATIN1 = "Not ASCII: é";
+
+ @Test
+ void asciiStringsSupportFastGetLatin1Bytes() {
+ assertTrue(StringIntrospection.supportsFastGetLatin1Bytes(ASCII));
+ }
+
+ @Test
+ void latin1StringsSupportsFastGetLatin1Bytes() {
+ assertTrue(StringIntrospection.supportsFastGetLatin1Bytes(NOT_ASCII_BUT_LATIN1));
+ }
+
+ @Test
+ void nonLatin1StringsDoNotSupportFastGetLatin1Bytes() {
+ assertFalse(StringIntrospection.supportsFastGetLatin1Bytes("Not Latin1: кириллица"));
+ }
+
+ @Test
+ void fastAsciiBytesReturnsCorrectBytes() {
+ byte[] asciiBytes = StringIntrospection.fastAsciiBytes(ASCII);
+
+ assertThat(asciiBytes, is(equalTo(ASCII.getBytes(UTF_8))));
+ }
+
+ @Test
+ void fastLatin1BytesReturnsCorrectBytes() {
+ byte[] asciiBytes = StringIntrospection.fastLatin1Bytes(NOT_ASCII_BUT_LATIN1);
+
+ assertThat(asciiBytes, is(equalTo(NOT_ASCII_BUT_LATIN1.getBytes(ISO_8859_1))));
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteTestIoUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteTestIoUtils.java
new file mode 100644
index 0000000..f04bdbe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteTestIoUtils.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+/**
+ * IO test utilities.
+ */
+public final class IgniteTestIoUtils {
+
+ /**
+ * Gets short value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ */
+ public static short getShortByByteLittleEndian(byte[] arr) {
+ return getShortByByteLittleEndian(arr, 0);
+ }
+
+ /**
+ * Gets short value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ */
+ public static short getShortByByteLittleEndian(byte[] arr, int off) {
+ return (short) ((arr[off] & 0xff) | arr[off + 1] << 8);
+ }
+
+ /**
+ * Gets char value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ */
+ public static char getCharByByteLittleEndian(byte[] arr) {
+ return getCharByByteLittleEndian(arr, 0);
+ }
+
+ /**
+ * Gets char value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ */
+ public static char getCharByByteLittleEndian(byte[] arr, int off) {
+ return (char) ((arr[off] & 0xff) | arr[off + 1] << 8);
+ }
+
+ /**
+ * Gets integer value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ */
+ public static int getIntByByteLittleEndian(byte[] arr) {
+ return getIntByByteLittleEndian(arr, 0);
+ }
+
+ /**
+ * Gets integer value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ */
+ public static int getIntByByteLittleEndian(byte[] arr, int off) {
+ return ((int) arr[off] & 0xff) | ((int) arr[off + 1] & 0xff) << 8
+ | ((int) arr[off + 2] & 0xff) << 16 | ((int) arr[off + 3] & 0xff) << 24;
+ }
+
+ /**
+ * Gets long value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ */
+ public static long getLongByByteLittleEndian(byte[] arr) {
+ return getLongByByteLittleEndian(arr, 0);
+ }
+
+ /**
+ * Gets long value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ */
+ public static long getLongByByteLittleEndian(byte[] arr, int off) {
+ return ((long) arr[off] & 0xff) | ((long) arr[off + 1] & 0xff) << 8 | ((long) arr[off + 2] & 0xff) << 16
+ | ((long) arr[off + 3] & 0xff) << 24 | ((long) arr[off + 4] & 0xff) << 32 | ((long) arr[off + 5] & 0xff) << 40
+ | ((long) arr[off + 6] & 0xff) << 48 | ((long) arr[off + 7] & 0xff) << 56;
+ }
+
+ /**
+ * Gets float value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ */
+ public static float getFloatByByteLittleEndian(byte[] arr) {
+ return getFloatByByteLittleEndian(arr, 0);
+ }
+
+ /**
+ * Gets float value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ */
+ public static float getFloatByByteLittleEndian(byte[] arr, int off) {
+ return Float.intBitsToFloat(getIntByByteLittleEndian(arr, off));
+ }
+
+ /**
+ * Gets double value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ */
+ public static double getDoubleByByteLittleEndian(byte[] arr) {
+ return getDoubleByByteLittleEndian(arr, 0);
+ }
+
+ /**
+ * Gets double value from byte array assuming that value stored in little-endian byte order.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ */
+ public static double getDoubleByByteLittleEndian(byte[] arr, int off) {
+ return Double.longBitsToDouble(getLongByByteLittleEndian(arr, off));
+ }
+
+ /**
+ * Ensure singleton.
+ */
+ private IgniteTestIoUtils() {
+ // No-op.
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputOutputByteOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputOutputByteOrderTest.java
new file mode 100644
index 0000000..afdfe79
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputOutputByteOrderTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getCharByByteLittleEndian;
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getDoubleByByteLittleEndian;
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getFloatByByteLittleEndian;
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getIntByByteLittleEndian;
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getLongByByteLittleEndian;
+import static org.apache.ignite.internal.util.io.IgniteTestIoUtils.getShortByByteLittleEndian;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.util.Random;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Ignite unsafe data input/output byte order sanity tests.
+ */
+class IgniteUnsafeDataInputOutputByteOrderTest {
+ /** Array length. */
+ private static final int ARR_LEN = 16;
+
+ /** Length bytes. */
+ private static final int LEN_BYTES = 0;
+
+ /** Rnd. */
+ private static final Random RND = new Random();
+
+ /** Out. */
+ private IgniteUnsafeDataOutput out;
+
+ /** In. */
+ private IgniteUnsafeDataInput in;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ out = new IgniteUnsafeDataOutput(16 * 8 + LEN_BYTES);
+ in = new IgniteUnsafeDataInput();
+ in.inputStream(new ByteArrayInputStream(out.internalArray()));
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ in.close();
+ out.close();
+ }
+
+ @Test
+ public void testShort() throws Exception {
+ short val = (short) RND.nextLong();
+
+ out.writeShort(val);
+
+ assertEquals(val, IgniteTestIoUtils.getShortByByteLittleEndian(out.internalArray()));
+ assertEquals(val, in.readShort());
+ }
+
+ @Test
+ public void testShortArray() throws Exception {
+ short[] arr = new short[ARR_LEN];
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ arr[i] = (short) RND.nextLong();
+ }
+
+ out.writeShortArray(arr);
+
+ byte[] outArr = out.internalArray();
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ assertEquals(arr[i], getShortByByteLittleEndian(outArr, i * 2 + LEN_BYTES));
+ }
+
+ assertArrayEquals(arr, in.readShortArray(ARR_LEN));
+ }
+
+ @Test
+ public void testChar() throws Exception {
+ char val = (char) RND.nextLong();
+
+ out.writeChar(val);
+
+ assertEquals(val, IgniteTestIoUtils.getCharByByteLittleEndian(out.internalArray()));
+ assertEquals(val, in.readChar());
+ }
+
+ @Test
+ public void testCharArray() throws Exception {
+ char[] arr = new char[ARR_LEN];
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ arr[i] = (char) RND.nextLong();
+ }
+
+ out.writeCharArray(arr);
+
+ byte[] outArr = out.internalArray();
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ assertEquals(arr[i], getCharByByteLittleEndian(outArr, i * 2 + LEN_BYTES));
+ }
+
+ assertArrayEquals(arr, in.readCharArray(ARR_LEN));
+ }
+
+ @Test
+ public void testInt() throws Exception {
+ int val = RND.nextInt();
+
+ out.writeInt(val);
+
+ assertEquals(val, IgniteTestIoUtils.getIntByByteLittleEndian(out.internalArray()));
+ assertEquals(val, in.readInt());
+ }
+
+ @Test
+ public void testIntArray() throws Exception {
+ int[] arr = new int[ARR_LEN];
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ arr[i] = RND.nextInt();
+ }
+
+ out.writeIntArray(arr);
+
+ byte[] outArr = out.internalArray();
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ assertEquals(arr[i], getIntByByteLittleEndian(outArr, i * 4 + LEN_BYTES));
+ }
+
+ assertArrayEquals(arr, in.readIntArray(ARR_LEN));
+ }
+
+ @Test
+ public void testLong() throws Exception {
+ long val = RND.nextLong();
+
+ out.writeLong(val);
+
+ assertEquals(val, IgniteTestIoUtils.getLongByByteLittleEndian(out.internalArray()));
+ assertEquals(val, in.readLong());
+ }
+
+ @Test
+ public void testLongArray() throws Exception {
+ long[] arr = new long[ARR_LEN];
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ arr[i] = RND.nextLong();
+ }
+
+ out.writeLongArray(arr);
+
+ byte[] outArr = out.internalArray();
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ assertEquals(arr[i], getLongByByteLittleEndian(outArr, i * 8 + LEN_BYTES));
+ }
+
+ assertArrayEquals(arr, in.readLongArray(ARR_LEN));
+ }
+
+ @Test
+ public void testFloat() throws Exception {
+ float val = RND.nextFloat();
+
+ out.writeFloat(val);
+
+ assertEquals(val, IgniteTestIoUtils.getFloatByByteLittleEndian(out.internalArray()), 0);
+ assertEquals(val, in.readFloat(), 0);
+ }
+
+ @Test
+ public void testFloatArray() throws Exception {
+ float[] arr = new float[ARR_LEN];
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ arr[i] = RND.nextFloat();
+ }
+
+ out.writeFloatArray(arr);
+
+ byte[] outArr = out.internalArray();
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ assertEquals(arr[i], getFloatByByteLittleEndian(outArr, i * 4 + LEN_BYTES), 0);
+ }
+
+ assertArrayEquals(arr, in.readFloatArray(ARR_LEN), 0);
+ }
+
+ @Test
+ public void testDouble() throws Exception {
+ double val = RND.nextDouble();
+
+ out.writeDouble(val);
+
+ assertEquals(val, IgniteTestIoUtils.getDoubleByByteLittleEndian(out.internalArray()), 0);
+ assertEquals(val, in.readDouble(), 0);
+ }
+
+ @Test
+ public void testDoubleArray() throws Exception {
+ double[] arr = new double[ARR_LEN];
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ arr[i] = RND.nextDouble();
+ }
+
+ out.writeDoubleArray(arr);
+
+ byte[] outArr = out.internalArray();
+
+ for (int i = 0; i < ARR_LEN; i++) {
+ assertEquals(arr[i], getDoubleByByteLittleEndian(outArr, i * 8 + LEN_BYTES), 0);
+ }
+
+ assertArrayEquals(arr, in.readDoubleArray(ARR_LEN), 0);
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputTest.java
similarity index 54%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
copy to modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputTest.java
index f7b2ede..8038459 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataInputTest.java
@@ -15,23 +15,26 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.serialization.marshal;
+package org.apache.ignite.internal.util.io;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
-/**
- * Knows how to write a value to a {@link DataOutputStream}.
- */
-interface ValueWriter<T> {
- /**
- * Writes the given value to a {@link DataOutputStream}.
- *
- * @param value value to write
- * @param output where to write to
- * @param context marshalling context
- * @throws IOException if an I/O problem occurs
- * @throws MarshalException if another problem occurs
- */
- void write(T value, DataOutputStream output, MarshallingContext context) throws IOException, MarshalException;
+import org.junit.jupiter.api.Test;
+
+class IgniteUnsafeDataInputTest {
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Test
+ void readReturnsNegativeValueWhenEndOfStreamIsReached() throws Exception {
+ var input = new IgniteUnsafeDataInput();
+ input.bytes(new byte[]{1, 2, 3}, 3);
+
+ byte[] bytes = new byte[4096];
+ input.read(bytes, 0, bytes.length);
+
+ int read = input.read(bytes, 0, bytes.length);
+
+ assertThat(read, is(lessThan(0)));
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutputArraySizingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutputArraySizingTest.java
new file mode 100644
index 0000000..58b0509
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/io/IgniteUnsafeDataOutputArraySizingTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.io;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for {@link IgniteUnsafeDataOutput} concerning internal buffer sizing.
+ */
+class IgniteUnsafeDataOutputArraySizingTest {
+ /** Small array. */
+ private static final byte[] SMALL = new byte[32];
+
+ /** Big array. */
+ private static final byte[] BIG = new byte[2048];
+
+ /** Buffer timeout. */
+ private static final long BUFFER_TIMEOUT = 1000;
+
+ /** Wait timeout is bigger then buffer timeout to prevent failures due to time measurement error. */
+ private static final long WAIT_BUFFER_TIMEOUT = BUFFER_TIMEOUT + BUFFER_TIMEOUT / 2;
+
+ @Test
+ public void testShrink() throws Exception {
+ final IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(512, BUFFER_TIMEOUT);
+
+ assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 256), WAIT_BUFFER_TIMEOUT));
+ assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 128), WAIT_BUFFER_TIMEOUT));
+ assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 64), WAIT_BUFFER_TIMEOUT));
+ assertFalse(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 32), WAIT_BUFFER_TIMEOUT));
+ assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 64), WAIT_BUFFER_TIMEOUT));
+ }
+
+ @Test
+ public void testGrow() throws Exception {
+ IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(512);
+
+ out.write(BIG);
+ out.cleanup();
+
+ assertEquals(4096, out.internalArray().length);
+ }
+
+ @Test
+ public void testChanged1() throws Exception {
+ IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(512, BUFFER_TIMEOUT);
+
+ for (int i = 0; i < 100; i++) {
+ Thread.sleep(100);
+
+ out.write(SMALL);
+ out.cleanup();
+ out.write(BIG);
+ out.cleanup();
+ }
+
+ assertEquals(4096, out.internalArray().length);
+ }
+
+ @Test
+ public void testChanged2() throws Exception {
+ final IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(512, BUFFER_TIMEOUT);
+
+ assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 256), WAIT_BUFFER_TIMEOUT));
+
+ out.write(BIG);
+ out.cleanup();
+ assertEquals(4096, out.internalArray().length);
+
+ assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 4096), WAIT_BUFFER_TIMEOUT));
+ assertTrue(IgniteTestUtils.waitForCondition(new WriteAndCheckPredicate(out, SMALL, 2048), 2 * WAIT_BUFFER_TIMEOUT));
+ }
+
+ private static class WriteAndCheckPredicate implements BooleanSupplier {
+ final IgniteUnsafeDataOutput out;
+
+ final byte[] bytes;
+
+ final int len;
+
+ WriteAndCheckPredicate(IgniteUnsafeDataOutput out, byte[] bytes, int len) {
+ this.out = out;
+ this.bytes = bytes;
+ this.len = len;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean getAsBoolean() {
+ try {
+ out.write(bytes);
+ out.cleanup();
+
+ System.out.println("L=" + out.internalArray().length);
+
+ return out.internalArray().length == len;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/VarIntsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
similarity index 98%
rename from modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/VarIntsTest.java
rename to modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
index 744f631..f0c62d5 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/VarIntsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/io/VarIntsTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.serialization.marshal;
+package org.apache.ignite.internal.util.io;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index 379dec6..c5c97c4 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -71,6 +71,24 @@
<!-- Test dependencies -->
<dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-configuration</artifactId>
<type>test-jar</type>
@@ -146,6 +164,12 @@
<artifactId>ignite-network-annotation-processor</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>${jmh.framework.version}</version>
+ </dependency>
</dependencies>
<configuration>
<annotationProcessorPaths>
@@ -154,6 +178,11 @@
<artifactId>ignite-network-annotation-processor</artifactId>
<version>${project.version}</version>
</path>
+ <path>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>${jmh.framework.version}</version>
+ </path>
</annotationProcessorPaths>
</configuration>
</plugin>
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
index e84a474..7dd46b8 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
@@ -1380,7 +1380,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
MarshalledObject res = serializationService.writeMarshallable(object);
marshallable = res.bytes();
// Get descriptors that were not previously sent to the remote node
- descriptors = serializationService.createClassDescriptorsMessages(res.usedDescriptors());
+ descriptors = serializationService.createClassDescriptorsMessages(res.usedDescriptorIds());
}
writeCollection(descriptors, MessageCollectionItemType.MSG, writer);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInType.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInType.java
index 56beee6..30c523c 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInType.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInType.java
@@ -76,7 +76,8 @@ public enum BuiltInType {
NULL(38, Null.class),
REFERENCE(39, ReferencePlaceholder.class),
CLASS(40, Class.class),
- PROXY(41, Proxy.class)
+ PROXY(41, Proxy.class),
+ STRING_LATIN1(42, StringLatin1Placeholder.class)
;
/**
@@ -136,6 +137,6 @@ public enum BuiltInType {
private static class ReferencePlaceholder {
}
- private static class NotNullPlaceholder {
+ private static class StringLatin1Placeholder {
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInTypeIds.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInTypeIds.java
index fe1380b..09c074b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInTypeIds.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/BuiltInTypeIds.java
@@ -30,4 +30,13 @@ public class BuiltInTypeIds {
public static final int DOUBLE = 10;
public static final int BOOLEAN = 12;
public static final int CHAR = 14;
+
+ public static final int BYTE_ARRAY = 21;
+ public static final int SHORT_ARRAY = 22;
+ public static final int INT_ARRAY = 23;
+ public static final int FLOAT_ARRAY = 24;
+ public static final int LONG_ARRAY = 25;
+ public static final int DOUBLE_ARRAY = 26;
+ public static final int BOOLEAN_ARRAY = 27;
+ public static final int CHAR_ARRAY = 28;
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptor.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptor.java
index 16c9602..035388d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptor.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptor.java
@@ -23,6 +23,8 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -99,6 +101,8 @@ public class ClassDescriptor {
/** Indices of non-primitive fields in the object fields array. */
private Object2IntMap<String> objectFieldIndices;
+ private final List<ClassDescriptor> lineage;
+
private final SpecialSerializationMethods serializationMethods;
/**
@@ -124,6 +128,8 @@ public class ClassDescriptor {
fieldNullsBitmapSize = computeFieldNullsBitmapSize(fields);
fieldNullsBitmapIndices = computeFieldNullsBitmapIndices(fields);
+ lineage = computeLineage(this);
+
serializationMethods = new SpecialSerializationMethodsImpl(this);
}
@@ -166,6 +172,20 @@ public class ClassDescriptor {
return Object2IntMaps.unmodifiable(map);
}
+ private static List<ClassDescriptor> computeLineage(ClassDescriptor descriptor) {
+ List<ClassDescriptor> descriptors = new ArrayList<>();
+
+ ClassDescriptor currentDesc = descriptor;
+ while (currentDesc != null) {
+ descriptors.add(currentDesc);
+ currentDesc = currentDesc.superClassDescriptor();
+ }
+
+ Collections.reverse(descriptors);
+
+ return List.copyOf(descriptors);
+ }
+
/**
* Returns descriptor id.
*
@@ -529,6 +549,15 @@ public class ClassDescriptor {
}
/**
+ * Returns the lineage (all the ancestors, from the progenitor (excluding Object) down the line, including this descriptor).
+ *
+ * @return ancestors from the progenitor (excluding Object) down the line, plus this descriptor
+ */
+ public List<ClassDescriptor> lineage() {
+ return lineage;
+ }
+
+ /**
* Returns {@code true} if the descriptor describes an enum class.
*
* @return {@code true} if the descriptor describes an enum class
@@ -538,9 +567,19 @@ public class ClassDescriptor {
}
/**
+ * Returns {@code true} if the descriptor describes a String that is represented with Latin-1 internally.
+ * Needed to apply an optimization.
+ *
+ * @return {@code true} if the descriptor describes a String that is represented with Latin-1 internally
+ */
+ public boolean isLatin1String() {
+ return descriptorId == BuiltInType.STRING_LATIN1.descriptorId();
+ }
+
+ /**
* Returns {@code true} if a field (or array item) of the described class can only host (at runtime) instances of this type
- * (and not subtypes), so the runtime type is known upfront. This is also true for enums, even though technically their values
- * might have subtypes; but we serialize them using their names, so we still treat the type as known upfront.
+ * (and not subtypes), so the runtime marshalling type is known upfront. This is also true for enums, even though technically
+ * their values might have subtypes; but we serialize them using their names, so we still treat the type as known upfront.
*
* @return {@code true} if a field (or array item) of the described class can only host (at runtime) instances of the concrete type
* that is known upfront
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorRegistry.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorRegistry.java
index c90f13c..2b36769 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorRegistry.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorRegistry.java
@@ -36,6 +36,8 @@ public class ClassDescriptorRegistry implements DescriptorRegistry {
/** Sequential id generator for class descriptors. */
private final AtomicInteger idGenerator = new AtomicInteger(BUILTIN_DESCRIPTORS_OFFSET_COUNT);
+ // TODO: IGNITE-16464 - do not keep references to classes forever
+
/** Map class -> descriptor id. */
private final ConcurrentMap<Class<?>, Integer> idMap = new ConcurrentHashMap<>();
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/Classes.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/Classes.java
index b4e71c8..f450b1d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/Classes.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/Classes.java
@@ -97,8 +97,8 @@ public class Classes {
/**
* Returns {@code true} if a field (or array item) of the described class can only host (at runtime) instances of this type
- * (and not subtypes), so the runtime type is known upfront. This is also true for enums, even though technically their values
- * might have subtypes; but we serialize them using their names, so we still treat the type as known upfront.
+ * (and not subtypes), so the runtime marshalling type is known upfront. This is also true for enums, even though technically
+ * their values might have subtypes; but we serialize them using their names, so we still treat the type as known upfront.
*
* @return {@code true} if a field (or array item) of the described class can only host (at runtime) instances of the concrete type
* that is known upfront
@@ -108,6 +108,11 @@ public class Classes {
return isRuntimeTypeKnownUpfront(clazz.getComponentType());
}
+ if (clazz == String.class) {
+ // a String may be represented with more than one built-in type, so sd don't know the type upfront
+ return false;
+ }
+
return clazz.isPrimitive() || Modifier.isFinal(clazz.getModifiers()) || isRuntimeEnum(clazz);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/FieldDescriptor.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/FieldDescriptor.java
index 09f2764..754ab82 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/FieldDescriptor.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/FieldDescriptor.java
@@ -125,8 +125,8 @@ public class FieldDescriptor {
/**
* Returns {@code true} if the field can only host (at runtime) instances of its declared type (and not subtypes),
- * so the runtime type is known upfront. This is also true for enums, even though technically their values might have subtypes;
- * but we serialize them using their names, so we still treat the type as known upfront.
+ * so the runtime marshalling type is known upfront. This is also true for enums, even though technically their values might
+ * have subtypes; but we serialize them using their names, so we still treat the type as known upfront.
*
* @return {@code true} if the field can only host (at runtime) instances of the declared type that is known upfront
*/
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
index d6200fe..7d8989c 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
@@ -27,7 +27,6 @@ import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
@@ -115,8 +114,7 @@ public class PerSessionSerializationService {
* @throws UserObjectSerializationException If failed to serialize an object.
* @see SerializationService#writeMarshallable(Object)
*/
- public <T> MarshalledObject writeMarshallable(T marshallable)
- throws UserObjectSerializationException {
+ public <T> MarshalledObject writeMarshallable(T marshallable) throws UserObjectSerializationException {
return serializationService.writeMarshallable(marshallable);
}
@@ -139,12 +137,13 @@ public class PerSessionSerializationService {
/**
* Creates a list of messages holding class descriptors.
*
- * @param descriptors Class descriptors.
+ * @param descriptorIds Class descriptors.
* @return List of class descriptor network messages.
*/
@Nullable
- public List<ClassDescriptorMessage> createClassDescriptorsMessages(Set<ClassDescriptor> descriptors) {
- List<ClassDescriptorMessage> messages = descriptors.stream()
+ public List<ClassDescriptorMessage> createClassDescriptorsMessages(IntSet descriptorIds) {
+ List<ClassDescriptorMessage> messages = descriptorIds.intStream()
+ .mapToObj(serializationService::getDescriptor)
.filter(descriptor -> {
int descriptorId = descriptor.descriptorId();
return !sentDescriptors.contains(descriptorId) && !shouldBeBuiltIn(descriptorId);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInContainerMarshallers.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInContainerMarshallers.java
index 183e80d..1b1fe30 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInContainerMarshallers.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInContainerMarshallers.java
@@ -21,8 +21,6 @@ import static java.util.Collections.singletonList;
import static org.apache.ignite.internal.network.serialization.marshal.ProtocolMarshalling.writeLength;
import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
@@ -37,6 +35,8 @@ import java.util.Map;
import java.util.function.IntFunction;
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
import org.apache.ignite.internal.network.serialization.Classes;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
/**
* Utility to (un)marshal built-in collections and maps.
@@ -78,7 +78,7 @@ class BuiltInContainerMarshallers {
this.typedReader = typedReader;
}
- void writeGenericRefArray(Object[] array, ClassDescriptor arrayDescriptor, DataOutputStream output, MarshallingContext context)
+ void writeGenericRefArray(Object[] array, ClassDescriptor arrayDescriptor, IgniteDataOutput output, MarshallingContext context)
throws IOException, MarshalException {
Class<?> componentType = array.getClass().getComponentType();
@@ -109,7 +109,7 @@ class BuiltInContainerMarshallers {
}
@SuppressWarnings("unchecked")
- <T> void fillGenericRefArray(DataInputStream input, T[] array, UnmarshallingContext context)
+ <T> void fillGenericRefArrayFrom(IgniteDataInput input, T[] array, UnmarshallingContext context)
throws IOException, UnmarshalException {
if (array.length == 0) {
return;
@@ -130,7 +130,7 @@ class BuiltInContainerMarshallers {
}
}
- void writeBuiltInCollection(Collection<?> object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+ void writeBuiltInCollection(Collection<?> object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
throws IOException, MarshalException {
if (supportsAsMutableBuiltInCollection(descriptor)) {
writeCollection(object, descriptor, output, context);
@@ -156,12 +156,12 @@ class BuiltInContainerMarshallers {
private void writeCollection(
Collection<?> collection,
ClassDescriptor collectionDescriptor,
- DataOutputStream output,
+ IgniteDataOutput output,
MarshallingContext context
) throws IOException, MarshalException {
- context.addUsedDescriptor(collectionDescriptor);
-
BuiltInMarshalling.writeCollection(collection, output, untypedWriter(), context);
+
+ context.addUsedDescriptor(collectionDescriptor);
}
@SuppressWarnings("unchecked")
@@ -169,7 +169,7 @@ class BuiltInContainerMarshallers {
return (ValueWriter<T>) untypedWriter;
}
- private void writeSingletonList(List<?> list, ClassDescriptor listDescriptor, DataOutputStream output, MarshallingContext context)
+ private void writeSingletonList(List<?> list, ClassDescriptor listDescriptor, IgniteDataOutput output, MarshallingContext context)
throws MarshalException, IOException {
assert list.size() == 1;
@@ -219,7 +219,7 @@ class BuiltInContainerMarshallers {
}
<T, C extends Collection<T>> void fillBuiltInCollectionFrom(
- DataInputStream input,
+ IgniteDataInput input,
C collection,
ClassDescriptor collectionDescriptor,
ValueReader<T> elementReader,
@@ -234,7 +234,7 @@ class BuiltInContainerMarshallers {
BuiltInMarshalling.fillCollectionFrom(input, collection, elementReader, context);
}
- void writeBuiltInMap(Map<?, ?> map, ClassDescriptor mapDescriptor, DataOutputStream output, MarshallingContext context)
+ void writeBuiltInMap(Map<?, ?> map, ClassDescriptor mapDescriptor, IgniteDataOutput output, MarshallingContext context)
throws MarshalException, IOException {
if (!supportsAsBuiltInMap(mapDescriptor)) {
throw new IllegalStateException("Marshalling of " + mapDescriptor.clazz() + " is not supported, but it's marked as a built-in");
@@ -277,7 +277,7 @@ class BuiltInContainerMarshallers {
}
<K, V, M extends Map<K, V>> void fillBuiltInMapFrom(
- DataInputStream input,
+ IgniteDataInput input,
M map,
ValueReader<K> keyReader,
ValueReader<V> valueReader,
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInMarshalling.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInMarshalling.java
index 6b3e798..768ab65 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInMarshalling.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInMarshalling.java
@@ -17,14 +17,13 @@
package org.apache.ignite.internal.network.serialization.marshal;
+import static java.nio.charset.StandardCharsets.ISO_8859_1;
import static java.util.Collections.singletonList;
import static org.apache.ignite.internal.network.serialization.marshal.ProtocolMarshalling.readLength;
import static org.apache.ignite.internal.network.serialization.marshal.ProtocolMarshalling.writeLength;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
@@ -32,9 +31,15 @@ import java.math.BigDecimal;
import java.util.BitSet;
import java.util.Collection;
import java.util.Date;
+import java.util.List;
import java.util.Map;
+import java.util.RandomAccess;
import java.util.UUID;
import java.util.function.IntFunction;
+import org.apache.ignite.internal.util.StringIntrospection;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataInput.Materializer;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
@@ -46,6 +51,8 @@ class BuiltInMarshalling {
private static final IntFunction<Class<?>[]> classArrayFactory = Class[]::new;
private static final ValueReader<Class<?>> classReader = BuiltInMarshalling::readClass;
+ private static final Materializer<String> LATIN1_MATERIALIZER = (bytes, offset, len) -> new String(bytes, offset, len, ISO_8859_1);
+
private static final Field singletonListElementField;
static {
@@ -65,6 +72,16 @@ class BuiltInMarshalling {
return input.readUTF();
}
+ static void writeLatin1String(String string, IgniteDataOutput output) throws IOException {
+ byte[] bytes = StringIntrospection.fastLatin1Bytes(string);
+ writeByteArray(bytes, output);
+ }
+
+ static String readLatin1String(IgniteDataInput input) throws IOException {
+ int length = readLength(input);
+ return input.materializeFromNextBytes(length, LATIN1_MATERIALIZER);
+ }
+
static Object readBareObject(@SuppressWarnings("unused") DataInput input) {
return new Object();
}
@@ -97,128 +114,128 @@ class BuiltInMarshalling {
return new Date(input.readLong());
}
- static void writeByteArray(byte[] array, DataOutput output) throws IOException {
+ static void writeByteArray(byte[] array, IgniteDataOutput output) throws IOException {
writeLength(array.length, output);
- output.write(array);
+ output.writeByteArray(array);
}
- static byte[] readByteArray(DataInput input) throws IOException {
+ static byte[] readByteArray(IgniteDataInput input) throws IOException {
int length = readLength(input);
- byte[] array = new byte[length];
- input.readFully(array);
- return array;
+ return input.readByteArray(length);
}
- static void writeShortArray(short[] array, DataOutput output) throws IOException {
+ static void writeShortArray(short[] array, IgniteDataOutput output) throws IOException {
writeLength(array.length, output);
- for (short sh : array) {
- output.writeShort(sh);
- }
+ output.writeShortArray(array);
}
- static short[] readShortArray(DataInput input) throws IOException {
+ static short[] readShortArray(IgniteDataInput input) throws IOException {
int length = readLength(input);
- short[] array = new short[length];
- for (int i = 0; i < length; i++) {
- array[i] = input.readShort();
- }
- return array;
+ return input.readShortArray(length);
}
- static void writeIntArray(int[] array, DataOutput output) throws IOException {
+ static void writeIntArray(int[] array, IgniteDataOutput output) throws IOException {
writeLength(array.length, output);
- for (int sh : array) {
- output.writeInt(sh);
- }
+ output.writeIntArray(array);
}
- static int[] readIntArray(DataInput input) throws IOException {
+ static int[] readIntArray(IgniteDataInput input) throws IOException {
int length = readLength(input);
- int[] array = new int[length];
- for (int i = 0; i < length; i++) {
- array[i] = input.readInt();
- }
- return array;
+ return input.readIntArray(length);
}
- static void writeFloatArray(float[] array, DataOutput output) throws IOException {
+ static void writeFloatArray(float[] array, IgniteDataOutput output) throws IOException {
writeLength(array.length, output);
- for (float sh : array) {
- output.writeFloat(sh);
- }
+ output.writeFloatArray(array);
}
- static float[] readFloatArray(DataInput input) throws IOException {
+ static float[] readFloatArray(IgniteDataInput input) throws IOException {
int length = readLength(input);
- float[] array = new float[length];
- for (int i = 0; i < length; i++) {
- array[i] = input.readFloat();
- }
- return array;
+ return input.readFloatArray(length);
}
- static void writeLongArray(long[] array, DataOutput output) throws IOException {
+ static void writeLongArray(long[] array, IgniteDataOutput output) throws IOException {
writeLength(array.length, output);
- for (long sh : array) {
- output.writeLong(sh);
- }
+ output.writeLongArray(array);
}
- static long[] readLongArray(DataInput input) throws IOException {
+ static long[] readLongArray(IgniteDataInput input) throws IOException {
int length = readLength(input);
- long[] array = new long[length];
- for (int i = 0; i < length; i++) {
- array[i] = input.readLong();
- }
- return array;
+ return input.readLongArray(length);
}
- static void writeDoubleArray(double[] array, DataOutput output) throws IOException {
+ static void writeDoubleArray(double[] array, IgniteDataOutput output) throws IOException {
writeLength(array.length, output);
- for (double sh : array) {
- output.writeDouble(sh);
- }
+ output.writeDoubleArray(array);
}
- static double[] readDoubleArray(DataInput input) throws IOException {
+ static double[] readDoubleArray(IgniteDataInput input) throws IOException {
int length = readLength(input);
- double[] array = new double[length];
- for (int i = 0; i < length; i++) {
- array[i] = input.readDouble();
- }
- return array;
+ return input.readDoubleArray(length);
}
- static void writeBooleanArray(boolean[] array, DataOutput output) throws IOException {
+ static void writeBooleanArray(boolean[] array, IgniteDataOutput output) throws IOException {
writeLength(array.length, output);
- for (boolean sh : array) {
- output.writeBoolean(sh);
+
+ byte bits = 0;
+ int writtenBytes = 0;
+ for (int i = 0; i < array.length; i++) {
+ boolean bit = array[i];
+ int bitIndex = i % 8;
+ if (bit) {
+ bits |= (1 << bitIndex);
+ }
+ if (bitIndex == 7) {
+ output.writeByte(bits);
+ writtenBytes++;
+ bits = 0;
+ }
+ }
+
+ int totalBytesToWrite = numberOfBytesToPackBits(array.length);
+ if (writtenBytes < totalBytesToWrite) {
+ output.writeByte(bits);
}
}
- static boolean[] readBooleanArray(DataInput input) throws IOException {
+ private static int numberOfBytesToPackBits(int length) {
+ return length / 8 + (length % 8 == 0 ? 0 : 1);
+ }
+
+ static boolean[] readBooleanArray(IgniteDataInput input) throws IOException {
int length = readLength(input);
+
boolean[] array = new boolean[length];
- for (int i = 0; i < length; i++) {
- array[i] = input.readBoolean();
+
+ int totalBytesToRead = numberOfBytesToPackBits(length);
+
+ for (int byteIndex = 0; byteIndex < totalBytesToRead; byteIndex++) {
+ byte bits = input.readByte();
+
+ int bitsToReadInThisByte;
+ if (byteIndex < totalBytesToRead - 1) {
+ bitsToReadInThisByte = 8;
+ } else {
+ bitsToReadInThisByte = length - (totalBytesToRead - 1) * 8;
+ }
+ for (int bitIndex = 0; bitIndex < bitsToReadInThisByte; bitIndex++) {
+ if ((bits & (1 << bitIndex)) != 0) {
+ array[byteIndex * 8 + bitIndex] = true;
+ }
+ }
}
+
return array;
}
- static void writeCharArray(char[] array, DataOutput output) throws IOException {
+ static void writeCharArray(char[] array, IgniteDataOutput output) throws IOException {
writeLength(array.length, output);
- for (char sh : array) {
- output.writeChar(sh);
- }
+ output.writeCharArray(array);
}
- static char[] readCharArray(DataInput input) throws IOException {
+ static char[] readCharArray(IgniteDataInput input) throws IOException {
int length = readLength(input);
- char[] array = new char[length];
- for (int i = 0; i < length; i++) {
- array[i] = input.readChar();
- }
- return array;
+ return input.readCharArray(length);
}
static void writeBigDecimal(BigDecimal object, DataOutput output) throws IOException {
@@ -256,16 +273,16 @@ class BuiltInMarshalling {
return classByName(className, context.classLoader());
}
- static void writeClassArray(Class<?>[] classes, DataOutputStream output, MarshallingContext context)
+ static void writeClassArray(Class<?>[] classes, IgniteDataOutput output, MarshallingContext context)
throws IOException, MarshalException {
writeRefArray(classes, output, classWriter, context);
}
- static Class<?>[] readClassArray(DataInputStream input, UnmarshallingContext context) throws IOException, UnmarshalException {
+ static Class<?>[] readClassArray(IgniteDataInput input, UnmarshallingContext context) throws IOException, UnmarshalException {
return readRefArray(input, classArrayFactory, classReader, context);
}
- private static <T> void writeRefArray(T[] array, DataOutputStream output, ValueWriter<T> valueWriter, MarshallingContext context)
+ private static <T> void writeRefArray(T[] array, IgniteDataOutput output, ValueWriter<T> valueWriter, MarshallingContext context)
throws IOException, MarshalException {
writeLength(array.length, output);
for (T object : array) {
@@ -274,7 +291,7 @@ class BuiltInMarshalling {
}
private static <T> T[] readRefArray(
- DataInputStream input,
+ IgniteDataInput input,
IntFunction<T[]> arrayFactory,
ValueReader<T> valueReader,
UnmarshallingContext context
@@ -287,7 +304,7 @@ class BuiltInMarshalling {
return array;
}
- private static <T> void fillRefArrayFrom(DataInputStream input, T[] array, ValueReader<T> valueReader, UnmarshallingContext context)
+ private static <T> void fillRefArrayFrom(IgniteDataInput input, T[] array, ValueReader<T> valueReader, UnmarshallingContext context)
throws IOException, UnmarshalException {
for (int i = 0; i < array.length; i++) {
array[i] = valueReader.read(input, context);
@@ -309,19 +326,35 @@ class BuiltInMarshalling {
static <T> void writeCollection(
Collection<T> collection,
- DataOutputStream output,
+ IgniteDataOutput output,
ValueWriter<T> valueWriter,
MarshallingContext context
) throws IOException, MarshalException {
writeLength(collection.size(), output);
- for (T object : collection) {
- valueWriter.write(object, output, context);
+ if (collection instanceof List && collection instanceof RandomAccess) {
+ writeRandomAccessListElements(output, valueWriter, context, (List<T>) collection);
+ } else {
+ for (T object : collection) {
+ valueWriter.write(object, output, context);
+ }
+ }
+ }
+
+ private static <T> void writeRandomAccessListElements(
+ IgniteDataOutput output,
+ ValueWriter<T> valueWriter,
+ MarshallingContext context,
+ List<T> list
+ ) throws IOException, MarshalException {
+ //noinspection ForLoopReplaceableByForEach
+ for (int i = 0; i < list.size(); i++) {
+ valueWriter.write(list.get(i), output, context);
}
}
static <T, C extends Collection<T>> void fillCollectionFrom(
- DataInputStream input,
+ IgniteDataInput input,
C collection,
ValueReader<T> valueReader,
UnmarshallingContext context
@@ -339,7 +372,7 @@ class BuiltInMarshalling {
}
static <T, C extends Collection<T>> void fillSingletonCollectionFrom(
- DataInputStream input,
+ IgniteDataInput input,
C collection,
ValueReader<T> elementReader,
UnmarshallingContext context
@@ -355,7 +388,7 @@ class BuiltInMarshalling {
static <K, V> void writeMap(
Map<K, V> map,
- DataOutputStream output,
+ IgniteDataOutput output,
ValueWriter<K> keyWriter,
ValueWriter<V> valueWriter,
MarshallingContext context
@@ -369,7 +402,7 @@ class BuiltInMarshalling {
}
static <K, V, M extends Map<K, V>> void fillMapFrom(
- DataInputStream input,
+ IgniteDataInput input,
M map,
ValueReader<K> keyReader,
ValueReader<V> valueReader,
@@ -387,11 +420,11 @@ class BuiltInMarshalling {
return mapFactory.apply(length);
}
- static void writeBitSet(BitSet object, DataOutput output) throws IOException {
+ static void writeBitSet(BitSet object, IgniteDataOutput output) throws IOException {
writeByteArray(object.toByteArray(), output);
}
- static BitSet readBitSet(DataInput input) throws IOException {
+ static BitSet readBitSet(IgniteDataInput input) throws IOException {
return BitSet.valueOf(readByteArray(input));
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInNonContainerMarshallers.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInNonContainerMarshallers.java
index 31f3693..8a6484e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInNonContainerMarshallers.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/BuiltInNonContainerMarshallers.java
@@ -18,9 +18,7 @@
package org.apache.ignite.internal.network.serialization.marshal;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.BitSet;
@@ -30,6 +28,8 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
import org.apache.ignite.internal.network.serialization.Null;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.apache.ignite.lang.IgniteUuid;
/**
@@ -122,32 +122,43 @@ class BuiltInNonContainerMarshallers {
* @return {@code true} if we the given descriptor is a built-in we can handle
*/
boolean supports(ClassDescriptor descriptor) {
- return descriptor.isEnum() || builtInMarshallers.containsKey(descriptor.clazz());
+ return descriptor.isEnum() || descriptor.isLatin1String()
+ || builtInMarshallers.containsKey(descriptor.clazz());
}
- void writeBuiltIn(Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+ void writeBuiltIn(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
throws IOException, MarshalException {
+ actuallyWrite(object, descriptor, output, context);
+
+ context.addUsedDescriptor(descriptor);
+ }
+
+ private void actuallyWrite(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
+ throws IOException, MarshalException {
+ if (descriptor.isLatin1String()) {
+ BuiltInMarshalling.writeLatin1String((String) object, output);
+ return;
+ }
if (descriptor.isEnum()) {
- writeEnum((Enum<?>) object, descriptor, output, context);
+ BuiltInMarshalling.writeEnum((Enum<?>) object, output);
return;
}
- BuiltInMarshaller<?> builtInMarshaller = findBuiltInMarshaller(descriptor);
-
- builtInMarshaller.marshal(object, output, context);
-
- context.addUsedDescriptor(descriptor);
+ writeWithBuiltInMarshaller(object, descriptor, output, context);
}
- private void writeEnum(Enum<?> object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
- throws IOException {
- BuiltInMarshalling.writeEnum(object, output);
+ private void writeWithBuiltInMarshaller(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
+ throws IOException, MarshalException {
+ BuiltInMarshaller<?> builtInMarshaller = findBuiltInMarshaller(descriptor);
- context.addUsedDescriptor(descriptor);
+ builtInMarshaller.marshal(object, output, context);
}
- Object readBuiltIn(ClassDescriptor descriptor, DataInputStream input, UnmarshallingContext context)
+ Object readBuiltIn(ClassDescriptor descriptor, IgniteDataInput input, UnmarshallingContext context)
throws IOException, UnmarshalException {
+ if (descriptor.isLatin1String()) {
+ return BuiltInMarshalling.readLatin1String(input);
+ }
if (descriptor.isEnum()) {
return readEnum(descriptor, input);
}
@@ -157,7 +168,7 @@ class BuiltInNonContainerMarshallers {
}
@SuppressWarnings({"unchecked", "rawtypes"})
- private Object readEnum(ClassDescriptor descriptor, DataInputStream input) throws IOException {
+ private Object readEnum(ClassDescriptor descriptor, DataInput input) throws IOException {
return BuiltInMarshalling.readEnum(input, (Class<? extends Enum>) descriptor.clazz());
}
@@ -180,11 +191,11 @@ class BuiltInNonContainerMarshallers {
this.reader = reader;
}
- private void marshal(Object object, DataOutputStream output, MarshallingContext context) throws IOException, MarshalException {
+ private void marshal(Object object, IgniteDataOutput output, MarshallingContext context) throws IOException, MarshalException {
writer.write(valueRefClass.cast(object), output, context);
}
- private Object unmarshal(DataInputStream input, UnmarshallingContext context) throws IOException, UnmarshalException {
+ private Object unmarshal(IgniteDataInput input, UnmarshallingContext context) throws IOException, UnmarshalException {
return reader.read(input, context);
}
}
@@ -198,7 +209,7 @@ class BuiltInNonContainerMarshallers {
* @throws IOException if an I/O problem occurs
* @throws MarshalException if another problem occurs
*/
- void write(T value, DataOutput output) throws IOException, MarshalException;
+ void write(T value, IgniteDataOutput output) throws IOException, MarshalException;
}
@@ -211,6 +222,6 @@ class BuiltInNonContainerMarshallers {
* @throws IOException if an I/O problem occurs
* @throws UnmarshalException if another problem (like {@link ClassNotFoundException}) occurs
*/
- T read(DataInput input) throws IOException, UnmarshalException;
+ T read(IgniteDataInput input) throws IOException, UnmarshalException;
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultFieldsReaderWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultFieldsReaderWriter.java
index 1e33ed8..be371d0 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultFieldsReaderWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultFieldsReaderWriter.java
@@ -18,11 +18,12 @@
package org.apache.ignite.internal.network.serialization.marshal;
import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.BitSet;
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.jetbrains.annotations.Nullable;
/**
@@ -40,11 +41,11 @@ interface DefaultFieldsReaderWriter {
* @throws MarshalException if something goes wrong
* @throws IOException if I/O fails
*/
- void defaultWriteFields(Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+ void defaultWriteFields(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
throws MarshalException, IOException;
@Nullable
- BitSet writeNullsBitSet(Object object, ClassDescriptor descriptor, DataOutputStream output) throws IOException;
+ BitSet writeNullsBitSet(Object object, ClassDescriptor descriptor, DataOutput output) throws IOException;
/**
* Reads object fields from the input stream and stores them in the object using default marshalling.
@@ -56,7 +57,7 @@ interface DefaultFieldsReaderWriter {
* @throws IOException if I/O fails
* @throws UnmarshalException if something goes wrong
*/
- void defaultFillFieldsFrom(DataInputStream input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
+ void defaultFillFieldsFrom(IgniteDataInput input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
throws IOException, UnmarshalException;
@Nullable
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshaller.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshaller.java
index e4ec1ed..34d1039 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshaller.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshaller.java
@@ -19,12 +19,8 @@ package org.apache.ignite.internal.network.serialization.marshal;
import static org.apache.ignite.internal.network.serialization.marshal.ObjectClass.objectClass;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.InputStream;
@@ -37,6 +33,9 @@ import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import org.apache.ignite.internal.network.serialization.DescriptorRegistry;
import org.apache.ignite.internal.network.serialization.SpecialMethodInvocationException;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
import org.jetbrains.annotations.Nullable;
/**
@@ -59,8 +58,16 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
private final ExternalizableMarshaller externalizableMarshaller;
private final ProxyMarshaller proxyMarshaller;
+ private final MarshallingValidations validations = new MarshallingValidations();
+
private final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ private final ThreadLocal<UosIgniteOutputStream> threadLocalDataOutput = ThreadLocal.withInitial(this::newOutput);
+
+ private UosIgniteOutputStream newOutput() {
+ return new UosIgniteOutputStream(4096);
+ }
+
/**
* Constructor.
*
@@ -94,17 +101,34 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
public MarshalledObject marshal(@Nullable Object object) throws MarshalException {
MarshallingContext context = new MarshallingContext();
- var baos = new ByteArrayOutputStream();
- try (var dos = new DataOutputStream(baos)) {
- marshalShared(object, dos, context);
+ UosIgniteOutputStream output = freshByteArrayOutputStream();
+ try {
+ marshalShared(object, output, context);
} catch (IOException e) {
throw new MarshalException("Cannot marshal", e);
+ } finally {
+ output.release();
}
- return new MarshalledObject(baos.toByteArray(), context.usedDescriptors());
+ return new MarshalledObject(output.array(), context.usedDescriptorIds());
}
- private void marshalShared(@Nullable Object object, DataOutputStream output, MarshallingContext context)
+ private UosIgniteOutputStream freshByteArrayOutputStream() {
+ UosIgniteOutputStream output = threadLocalDataOutput.get();
+
+ if (output.isOccupied()) {
+ // This is a nested invocation, probably from a callback method like writeObject(), we can't reuse
+ // the 'outer' output, let's make a new one as this should not happen often.
+ output = newOutput();
+ } else {
+ output.cleanup();
+ }
+ output.occupy();
+
+ return output;
+ }
+
+ private void marshalShared(@Nullable Object object, IgniteDataOutput output, MarshallingContext context)
throws MarshalException, IOException {
marshalShared(object, NO_DECLARED_CLASS, output, context);
}
@@ -112,13 +136,13 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
private void marshalShared(
@Nullable Object object,
@Nullable Class<?> declaredClass,
- DataOutputStream output,
+ IgniteDataOutput output,
MarshallingContext context
) throws MarshalException, IOException {
marshalToOutput(object, declaredClass, output, context, NOT_UNSHARED);
}
- private void marshalUnshared(@Nullable Object object, Class<?> declaredClass, DataOutputStream output, MarshallingContext context)
+ private void marshalUnshared(@Nullable Object object, Class<?> declaredClass, IgniteDataOutput output, MarshallingContext context)
throws MarshalException, IOException {
marshalToOutput(object, declaredClass, output, context, UNSHARED);
}
@@ -126,7 +150,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
private void marshalToOutput(
@Nullable Object object,
@Nullable Class<?> declaredClass,
- DataOutputStream output,
+ IgniteDataOutput output,
MarshallingContext context,
boolean unshared
) throws MarshalException, IOException {
@@ -137,7 +161,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
|| declaredClass.isAssignableFrom(object.getClass())
: "Object " + object + " is expected to be an instance of subclass of " + declaredClass + ", but it's " + object.getClass();
- MarshallingValidations.throwIfMarshallingNotSupported(object);
+ validations.throwIfMarshallingNotSupported(object);
ClassDescriptor originalDescriptor = localDescriptors.getOrCreateDescriptor(object, declaredClass);
@@ -207,7 +231,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
ClassDescriptor descriptor,
@Nullable Class<?> declaredClass,
int objectId,
- DataOutputStream output,
+ IgniteDataOutput output,
MarshallingContext context
) throws IOException, MarshalException {
if (!runtimeTypeIsKnownUpfront(declaredClass)) {
@@ -235,7 +259,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
Object object,
ClassDescriptor descriptor,
Class<?> declaredClass,
- DataOutputStream output,
+ IgniteDataOutput output,
MarshallingContext context
) throws IOException, MarshalException {
if (!runtimeTypeIsKnownUpfront(declaredClass)) {
@@ -245,7 +269,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
writeObject(object, descriptor, output, context);
}
- private void writeObject(@Nullable Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+ private void writeObject(@Nullable Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
throws IOException, MarshalException {
if (isBuiltInNonContainer(descriptor)) {
builtInNonContainerMarshallers.writeBuiltIn(object, descriptor, output, context);
@@ -286,12 +310,13 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
@Override
@Nullable
public <T> T unmarshal(byte[] bytes, DescriptorRegistry mergedDescriptors) throws UnmarshalException {
- var bais = new ByteArrayInputStream(bytes);
- try (var dis = new DataInputStream(bais)) {
- UnmarshallingContext context = new UnmarshallingContext(bais, mergedDescriptors, classLoader);
- T result = unmarshalShared(dis, context);
+ var input = new IgniteUnsafeDataInput(bytes);
+
+ try {
+ UnmarshallingContext context = new UnmarshallingContext(input, mergedDescriptors, classLoader);
+ T result = unmarshalShared(input, context);
- throwIfNotDrained(dis);
+ throwIfNotDrained(input);
return result;
} catch (IOException e) {
@@ -299,22 +324,22 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
}
}
- private <T> T unmarshalShared(DataInputStream input, UnmarshallingContext context) throws IOException, UnmarshalException {
+ private <T> T unmarshalShared(IgniteDataInput input, UnmarshallingContext context) throws IOException, UnmarshalException {
return unmarshalShared(input, NO_DECLARED_CLASS, context);
}
- private <T> T unmarshalShared(DataInputStream input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
+ private <T> T unmarshalShared(IgniteDataInput input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
throws IOException, UnmarshalException {
return unmarshalFromInput(input, declaredClass, context, NOT_UNSHARED);
}
- private <T> T unmarshalUnshared(DataInputStream input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
+ private <T> T unmarshalUnshared(IgniteDataInput input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
throws IOException, UnmarshalException {
return unmarshalFromInput(input, declaredClass, context, UNSHARED);
}
private <T> T unmarshalFromInput(
- DataInputStream input,
+ IgniteDataInput input,
@Nullable Class<?> declaredClass,
UnmarshallingContext context,
boolean unshared
@@ -335,7 +360,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
return resolvedObject;
}
- private ClassDescriptor resolveDescriptor(DataInputStream input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
+ private ClassDescriptor resolveDescriptor(IgniteDataInput input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
throws UnmarshalException, IOException {
if (runtimeTypeIsKnownUpfront(declaredClass)) {
return context.resolveDescriptorOfDeclaredClass(declaredClass);
@@ -345,7 +370,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
}
}
- private int peekObjectId(DataInputStream input, UnmarshallingContext context) throws IOException {
+ private int peekObjectId(DataInput input, UnmarshallingContext context) throws IOException {
context.markSource(ProtocolMarshalling.MAX_LENGTH_BYTE_COUNT);
int objectId = ProtocolMarshalling.readObjectId(input);
context.resetSourceToMark();
@@ -367,7 +392,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
}
@Nullable
- private Object readObject(DataInputStream input, UnmarshallingContext context, ClassDescriptor descriptor, boolean unshared)
+ private Object readObject(IgniteDataInput input, UnmarshallingContext context, ClassDescriptor descriptor, boolean unshared)
throws IOException, UnmarshalException {
if (!mayHaveObjectIdentity(descriptor)) {
return readValue(input, descriptor, context);
@@ -384,7 +409,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
@Nullable
private Object readIdentifiableInOneStage(
- DataInputStream input,
+ IgniteDataInput input,
ClassDescriptor descriptor,
UnmarshallingContext context,
boolean unshared
@@ -397,12 +422,12 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
return object;
}
- private int readObjectId(DataInputStream input) throws IOException {
+ private int readObjectId(DataInput input) throws IOException {
return ProtocolMarshalling.readObjectId(input);
}
private Object readIdentifiableInTwoStages(
- DataInputStream input,
+ IgniteDataInput input,
ClassDescriptor descriptor,
UnmarshallingContext context,
boolean unshared
@@ -417,7 +442,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
return preInstantiatedObject;
}
- private Object preInstantiate(ClassDescriptor descriptor, DataInputStream input, UnmarshallingContext context)
+ private Object preInstantiate(ClassDescriptor descriptor, IgniteDataInput input, UnmarshallingContext context)
throws IOException, UnmarshalException {
if (isBuiltInNonContainer(descriptor)) {
throw new IllegalStateException("Should not be here, descriptor is " + descriptor);
@@ -436,7 +461,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
}
}
- private void fillObjectFrom(DataInputStream input, Object objectToFill, ClassDescriptor descriptor, UnmarshallingContext context)
+ private void fillObjectFrom(IgniteDataInput input, Object objectToFill, ClassDescriptor descriptor, UnmarshallingContext context)
throws UnmarshalException, IOException {
if (isBuiltInNonContainer(descriptor)) {
throw new IllegalStateException("Cannot fill " + descriptor.clazz() + ", this is a programmatic error");
@@ -456,7 +481,7 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
}
private void fillBuiltInCollectionFrom(
- DataInputStream input,
+ IgniteDataInput input,
Collection<?> collectionToFill,
ClassDescriptor descriptor,
UnmarshallingContext context
@@ -464,18 +489,18 @@ public class DefaultUserObjectMarshaller implements UserObjectMarshaller {
builtInContainerMarshallers.fillBuiltInCollectionFrom(input, collectionToFill, descriptor, this::unmarshalShared, context);
}
- private void fillBuiltInMapFrom(DataInputStream input, Map<?, ?> mapToFill, UnmarshallingContext context)
+ private void fillBuiltInMapFrom(IgniteDataInput input, Map<?, ?> mapToFill, UnmarshallingContext context)
throws UnmarshalException, IOException {
builtInContainerMarshallers.fillBuiltInMapFrom(input, mapToFill, this::unmarshalShared, this::unmarshalShared, context);
}
- private void fillGenericRefArrayFrom(DataInputStream input, Object[] array, UnmarshallingContext context)
+ private void fillGenericRefArrayFrom(IgniteDataInput input, Object[] array, UnmarshallingContext context)
throws IOException, UnmarshalException {
- builtInContainerMarshallers.fillGenericRefArray(input, array, context);
+ builtInContainerMarshallers.fillGenericRefArrayFrom(input, array, context);
}
@Nullable
- private Object readValue(DataInputStream input, ClassDescriptor descriptor, UnmarshallingContext context)
+ private Object readValue(IgniteDataInput input, ClassDescriptor descriptor, UnmarshallingContext context)
throws IOException, UnmarshalException {
if (isBuiltInNonContainer(descriptor)) {
return builtInNonContainerMarshallers.readBuiltIn(descriptor, input, context);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ExternalizableMarshaller.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ExternalizableMarshaller.java
index fa6b753..eb9d5c7 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ExternalizableMarshaller.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ExternalizableMarshaller.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.network.serialization.marshal;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
/**
* (Um)marshalling specific to EXTERNALIZABLE serialization type.
@@ -49,14 +49,14 @@ class ExternalizableMarshaller {
this.defaultFieldsReaderWriter = defaultFieldsReaderWriter;
}
- void writeExternalizable(Externalizable externalizable, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+ void writeExternalizable(Externalizable externalizable, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
throws IOException {
externalizeTo(externalizable, output, context);
context.addUsedDescriptor(descriptor);
}
- private void externalizeTo(Externalizable externalizable, DataOutputStream output, MarshallingContext context)
+ private void externalizeTo(Externalizable externalizable, IgniteDataOutput output, MarshallingContext context)
throws IOException {
// Do not close the stream yet!
UosObjectOutputStream oos = context.objectOutputStream(output, valueWriter, unsharedWriter, defaultFieldsReaderWriter);
@@ -81,7 +81,7 @@ class ExternalizableMarshaller {
}
}
- <T extends Externalizable> void fillExternalizableFrom(DataInputStream input, T object, UnmarshallingContext context)
+ <T extends Externalizable> void fillExternalizableFrom(IgniteDataInput input, T object, UnmarshallingContext context)
throws IOException, UnmarshalException {
// Do not close the stream yet!
UosObjectInputStream ois = context.objectInputStream(input, valueReader, unsharedReader, defaultFieldsReaderWriter);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/Bits.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LittleEndianBits.java
similarity index 60%
rename from modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/Bits.java
rename to modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LittleEndianBits.java
index ae5085c..d64a8c7 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/Bits.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LittleEndianBits.java
@@ -19,27 +19,28 @@ package org.apache.ignite.internal.network.serialization.marshal;
/**
* Code for packing/unpacking primitive values to/from a byte array at specific offsets.
+ * Little endian byte order is used.
*/
-class Bits {
+class LittleEndianBits {
static boolean getBoolean(byte[] b, int off) {
return b[off] != 0;
}
static char getChar(byte[] b, int off) {
- return (char) ((b[off + 1] & 0xFF)
- + (b[off] << 8));
+ return (char) ((b[off] & 0xFF)
+ + (b[off + 1] << 8));
}
static short getShort(byte[] b, int off) {
- return (short) ((b[off + 1] & 0xFF)
- + (b[off] << 8));
+ return (short) ((b[off] & 0xFF)
+ + (b[off + 1] << 8));
}
static int getInt(byte[] b, int off) {
- return ((b[off + 3] & 0xFF))
- + ((b[off + 2] & 0xFF) << 8)
- + ((b[off + 1] & 0xFF) << 16)
- + ((b[off]) << 24);
+ return ((b[off] & 0xFF))
+ + ((b[off + 1] & 0xFF) << 8)
+ + ((b[off + 2] & 0xFF) << 16)
+ + ((b[off + 3]) << 24);
}
static float getFloat(byte[] b, int off) {
@@ -47,14 +48,14 @@ class Bits {
}
static long getLong(byte[] b, int off) {
- return ((b[off + 7] & 0xFFL))
- + ((b[off + 6] & 0xFFL) << 8)
- + ((b[off + 5] & 0xFFL) << 16)
- + ((b[off + 4] & 0xFFL) << 24)
- + ((b[off + 3] & 0xFFL) << 32)
- + ((b[off + 2] & 0xFFL) << 40)
- + ((b[off + 1] & 0xFFL) << 48)
- + (((long) b[off]) << 56);
+ return ((b[off] & 0xFFL))
+ + ((b[off + 1] & 0xFFL) << 8)
+ + ((b[off + 2] & 0xFFL) << 16)
+ + ((b[off + 3] & 0xFFL) << 24)
+ + ((b[off + 4] & 0xFFL) << 32)
+ + ((b[off + 5] & 0xFFL) << 40)
+ + ((b[off + 6] & 0xFFL) << 48)
+ + (((long) b[off + 7]) << 56);
}
static double getDouble(byte[] b, int off) {
@@ -66,20 +67,20 @@ class Bits {
}
static void putChar(byte[] b, int off, char val) {
- b[off + 1] = (byte) (val);
- b[off] = (byte) (val >>> 8);
+ b[off] = (byte) (val);
+ b[off + 1] = (byte) (val >>> 8);
}
static void putShort(byte[] b, int off, short val) {
- b[off + 1] = (byte) (val);
- b[off] = (byte) (val >>> 8);
+ b[off] = (byte) (val);
+ b[off + 1] = (byte) (val >>> 8);
}
static void putInt(byte[] b, int off, int val) {
- b[off + 3] = (byte) (val);
- b[off + 2] = (byte) (val >>> 8);
- b[off + 1] = (byte) (val >>> 16);
- b[off] = (byte) (val >>> 24);
+ b[off] = (byte) (val);
+ b[off + 1] = (byte) (val >>> 8);
+ b[off + 2] = (byte) (val >>> 16);
+ b[off + 3] = (byte) (val >>> 24);
}
static void putFloat(byte[] b, int off, float val) {
@@ -87,14 +88,14 @@ class Bits {
}
static void putLong(byte[] b, int off, long val) {
- b[off + 7] = (byte) (val);
- b[off + 6] = (byte) (val >>> 8);
- b[off + 5] = (byte) (val >>> 16);
- b[off + 4] = (byte) (val >>> 24);
- b[off + 3] = (byte) (val >>> 32);
- b[off + 2] = (byte) (val >>> 40);
- b[off + 1] = (byte) (val >>> 48);
- b[off] = (byte) (val >>> 56);
+ b[off] = (byte) (val);
+ b[off + 1] = (byte) (val >>> 8);
+ b[off + 2] = (byte) (val >>> 16);
+ b[off + 3] = (byte) (val >>> 24);
+ b[off + 4] = (byte) (val >>> 32);
+ b[off + 5] = (byte) (val >>> 40);
+ b[off + 6] = (byte) (val >>> 48);
+ b[off + 7] = (byte) (val >>> 56);
}
static void putDouble(byte[] b, int off, double val) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LocalDescriptors.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LocalDescriptors.java
index 8da1e46..6257637 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LocalDescriptors.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/LocalDescriptors.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.network.serialization.marshal;
+import org.apache.ignite.internal.network.serialization.BuiltInType;
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.util.StringIntrospection;
import org.jetbrains.annotations.Nullable;
/**
@@ -38,6 +40,9 @@ class LocalDescriptors {
if (object == null) {
return localRegistry.getNullDescriptor();
}
+ if (object instanceof String && StringIntrospection.supportsFastGetLatin1Bytes((String) object)) {
+ return localRegistry.getBuiltInDescriptor(BuiltInType.STRING_LATIN1);
+ }
// For primitives, we need to keep the declaredClass (it differs from object.getClass()).
Class<?> classToQueryForOriginalDescriptor = declaredClass != null && declaredClass.isPrimitive()
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshalledObject.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshalledObject.java
index 761d5e3..70d1396 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshalledObject.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshalledObject.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.network.serialization.marshal;
-import java.util.Arrays;
+import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.Objects;
-import java.util.Set;
-import org.apache.ignite.internal.network.serialization.ClassDescriptor;
/**
* Represents a marshalled object: the marshalled representation with information about how it was marshalled
@@ -30,21 +28,21 @@ public class MarshalledObject {
/** Marshalled object representation. */
private final byte[] bytes;
- /** The descriptors that were used while marshalling the object. */
- private final Set<ClassDescriptor> usedDescriptors;
+ /** IDs of the descriptors that were used while marshalling the object. */
+ private final IntSet usedDescriptorIds;
/**
* Creates a new {@link MarshalledObject}.
*
* @param bytes marshalled representation bytes
- * @param usedDescriptors the descriptors that were used to marshal the object
+ * @param usedDescriptorIds the descriptors that were used to marshal the object
*/
- public MarshalledObject(byte[] bytes, Set<ClassDescriptor> usedDescriptors) {
+ public MarshalledObject(byte[] bytes, IntSet usedDescriptorIds) {
Objects.requireNonNull(bytes, "bytes is null");
- Objects.requireNonNull(usedDescriptors, "usedDescriptors is null");
+ Objects.requireNonNull(usedDescriptorIds, "usedDescriptorIds is null");
- this.bytes = Arrays.copyOf(bytes, bytes.length);
- this.usedDescriptors = Set.copyOf(usedDescriptors);
+ this.bytes = bytes;
+ this.usedDescriptorIds = usedDescriptorIds;
}
/**
@@ -53,15 +51,15 @@ public class MarshalledObject {
* @return marshalled object representation
*/
public byte[] bytes() {
- return Arrays.copyOf(bytes, bytes.length);
+ return bytes;
}
/**
- * Returns the descriptors that were used while marshalling the object.
+ * Returns IDs of the descriptors that were used while marshalling the object.
*
- * @return the descriptors that were used while marshalling the object
+ * @return IDs of the descriptors that were used while marshalling the object
*/
- public Set<ClassDescriptor> usedDescriptors() {
- return usedDescriptors;
+ public IntSet usedDescriptorIds() {
+ return usedDescriptorIds;
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingContext.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingContext.java
index 4f90487..57b3c53 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingContext.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingContext.java
@@ -17,25 +17,24 @@
package org.apache.ignite.internal.network.serialization.marshal;
-import static java.util.Collections.unmodifiableSet;
-
-import java.io.DataOutputStream;
+import it.unimi.dsi.fastutil.Hash;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenCustomHashMap;
import java.io.IOException;
import java.io.NotActiveException;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
/**
* Context using during marshalling of an object graph accessible from a root object.
*/
class MarshallingContext {
- private final Set<ClassDescriptor> usedDescriptors = new HashSet<>();
+ private final IntSet usedDescriptorIds = new IntOpenHashSet();
- private final Map<Object, Integer> objectsToIds = new IdentityHashMap<>();
+ private final Object2IntMap<Object> objectsToIds = new Object2IntOpenCustomHashMap<>(new IdentityHashStrategy());
private int nextObjectId = 0;
@@ -45,11 +44,11 @@ class MarshallingContext {
private UosObjectOutputStream objectOutputStream;
public void addUsedDescriptor(ClassDescriptor descriptor) {
- usedDescriptors.add(descriptor);
+ usedDescriptorIds.add(descriptor.descriptorId());
}
- public Set<ClassDescriptor> usedDescriptors() {
- return unmodifiableSet(usedDescriptors);
+ public IntSet usedDescriptorIds() {
+ return usedDescriptorIds;
}
/**
@@ -71,9 +70,8 @@ class MarshallingContext {
return FlaggedObjectIds.freshObjectId(newId);
}
- Integer prevId = objectsToIds.get(object);
- if (prevId != null) {
- return FlaggedObjectIds.alreadySeenObjectId(prevId);
+ if (objectsToIds.containsKey(object)) {
+ return FlaggedObjectIds.alreadySeenObjectId(objectsToIds.getInt(object));
} else {
int newId = nextId();
@@ -114,7 +112,7 @@ class MarshallingContext {
}
UosObjectOutputStream objectOutputStream(
- DataOutputStream output,
+ IgniteDataOutput output,
TypedValueWriter valueWriter,
TypedValueWriter unsharedWriter,
DefaultFieldsReaderWriter defaultFieldsReaderWriter
@@ -125,4 +123,18 @@ class MarshallingContext {
return objectOutputStream;
}
+
+ private static class IdentityHashStrategy implements Hash.Strategy<Object> {
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode(Object o) {
+ return System.identityHashCode(o);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object a, Object b) {
+ return a == b;
+ }
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingValidations.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingValidations.java
index 5d736f8..80a0e21 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingValidations.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/MarshallingValidations.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.network.serialization.marshal;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.function.Function;
import org.apache.ignite.internal.network.serialization.Classes;
import org.jetbrains.annotations.Nullable;
@@ -26,32 +29,58 @@ import org.jetbrains.annotations.Nullable;
* Validations that are run before marshalling objects.
*/
class MarshallingValidations {
- static void throwIfMarshallingNotSupported(@Nullable Object object) {
+ private final Map<Class<?>, Marshallability> marshallabilityMap = new WeakHashMap<>();
+
+ private final Function<Class<?>, Marshallability> marshallabilityFunction = this::marshallability;
+
+ void throwIfMarshallingNotSupported(@Nullable Object object) {
if (object == null) {
return;
}
- if (Enum.class.isAssignableFrom(object.getClass())) {
- return;
- }
Class<?> objectClass = object.getClass();
+
+ Marshallability marshallability = marshallabilityMap.computeIfAbsent(objectClass, marshallabilityFunction);
+
+ switch (marshallability) {
+ case INNER_CLASS:
+ throw new MarshallingNotSupportedException("Non-static inner class instances are not supported for marshalling: "
+ + objectClass);
+ case CAPTUREING_CLOSURE:
+ throw new MarshallingNotSupportedException("Capturing nested class instances are not supported for marshalling: " + object);
+ case NON_SERIALIZABLE_LAMBDA:
+ throw new MarshallingNotSupportedException("Non-serializable lambda instances are not supported for marshalling: "
+ + object);
+ default:
+ // do nothing
+ }
+ }
+
+ private Marshallability marshallability(Class<?> objectClass) {
+ if (Enum.class.isAssignableFrom(objectClass)) {
+ return Marshallability.OK;
+ }
+
if (isInnerClass(objectClass)) {
- throw new MarshallingNotSupportedException("Non-static inner class instances are not supported for marshalling: "
- + objectClass);
+ return Marshallability.INNER_CLASS;
}
+
if (isCapturingClosure(objectClass)) {
- throw new MarshallingNotSupportedException("Capturing nested class instances are not supported for marshalling: " + object);
+ return Marshallability.CAPTUREING_CLOSURE;
}
- if (Classes.isLambda(objectClass) && !Classes.isSerializable(objectClass)) {
- throw new MarshallingNotSupportedException("Non-serializable lambda instances are not supported for marshalling: " + object);
+
+ if (isNonSerializableLambda(objectClass)) {
+ return Marshallability.NON_SERIALIZABLE_LAMBDA;
}
+
+ return Marshallability.OK;
}
- private static boolean isInnerClass(Class<?> objectClass) {
+ private boolean isInnerClass(Class<?> objectClass) {
return objectClass.getDeclaringClass() != null && !Modifier.isStatic(objectClass.getModifiers());
}
- private static boolean isCapturingClosure(Class<?> objectClass) {
+ private boolean isCapturingClosure(Class<?> objectClass) {
for (Field field : objectClass.getDeclaredFields()) {
if ((field.isSynthetic() && field.getName().equals("this$0"))
|| field.getName().startsWith("arg$")) {
@@ -62,6 +91,14 @@ class MarshallingValidations {
return false;
}
- private MarshallingValidations() {
+ private boolean isNonSerializableLambda(Class<?> objectClass) {
+ return Classes.isLambda(objectClass) && !Classes.isSerializable(objectClass);
+ }
+
+ private enum Marshallability {
+ OK,
+ INNER_CLASS,
+ CAPTUREING_CLOSURE,
+ NON_SERIALIZABLE_LAMBDA
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
index 7dcf45a..3ee474f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProtocolMarshalling.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.BitSet;
+import org.apache.ignite.internal.util.io.VarInts;
/**
* Protocol-wide elements marshalling.
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProxyMarshaller.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProxyMarshaller.java
index dfb8e7e..4ca724f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProxyMarshaller.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ProxyMarshaller.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.network.serialization.marshal;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
/**
* (Un)marshalling logic specific to {@link Proxy} instances.
@@ -61,7 +61,7 @@ class ProxyMarshaller {
return Proxy.isProxyClass(classToCheck);
}
- void writeProxy(Object proxy, DataOutputStream output, MarshallingContext context) throws MarshalException, IOException {
+ void writeProxy(Object proxy, IgniteDataOutput output, MarshallingContext context) throws MarshalException, IOException {
assert Proxy.isProxyClass(proxy.getClass());
BuiltInMarshalling.writeClassArray(proxy.getClass().getInterfaces(), output, context);
@@ -69,18 +69,18 @@ class ProxyMarshaller {
valueWriter.write(Proxy.getInvocationHandler(proxy), output, context);
}
- Object preInstantiateProxy(DataInputStream input, UnmarshallingContext context) throws UnmarshalException, IOException {
+ Object preInstantiateProxy(IgniteDataInput input, UnmarshallingContext context) throws UnmarshalException, IOException {
Class<?>[] interfaces = BuiltInMarshalling.readClassArray(input, context);
return Proxy.newProxyInstance(context.classLoader(), interfaces, placeholderInvocationHandler);
}
- void fillProxyFrom(DataInputStream input, Object proxyToFill, UnmarshallingContext context) throws UnmarshalException, IOException {
+ void fillProxyFrom(IgniteDataInput input, Object proxyToFill, UnmarshallingContext context) throws UnmarshalException, IOException {
InvocationHandler invocationHandler = readInvocationHandler(input, context);
replaceInvocationHandler(proxyToFill, invocationHandler);
}
- private InvocationHandler readInvocationHandler(DataInputStream input, UnmarshallingContext context)
+ private InvocationHandler readInvocationHandler(IgniteDataInput input, UnmarshallingContext context)
throws IOException, UnmarshalException {
Object object = valueReader.read(input, context);
if (!(object instanceof InvocationHandler)) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/StructuredObjectMarshaller.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/StructuredObjectMarshaller.java
index 18ffa33..11ff853 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/StructuredObjectMarshaller.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/StructuredObjectMarshaller.java
@@ -18,12 +18,9 @@
package org.apache.ignite.internal.network.serialization.marshal;
import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Collections;
import java.util.List;
import org.apache.ignite.internal.network.serialization.BuiltInTypeIds;
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
@@ -33,6 +30,8 @@ import org.apache.ignite.internal.network.serialization.IdIndexedDescriptors;
import org.apache.ignite.internal.network.serialization.SpecialMethodInvocationException;
import org.apache.ignite.internal.network.serialization.marshal.UosObjectInputStream.UosGetField;
import org.apache.ignite.internal.network.serialization.marshal.UosObjectOutputStream.UosPutField;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.jetbrains.annotations.Nullable;
/**
@@ -68,34 +67,17 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
);
}
- void writeStructuredObject(Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+ void writeStructuredObject(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
throws MarshalException, IOException {
- for (ClassDescriptor layer : lineage(descriptor)) {
+ List<ClassDescriptor> lineage = descriptor.lineage();
+
+ // using a for loop to avoid allocation of an iterator
+ for (ClassDescriptor layer : lineage) {
writeStructuredObjectLayer(object, layer, output, context);
}
}
- /**
- * Returns the lineage (all the ancestors, from the progenitor (excluding Object) down the line, including the given class).
- *
- * @param descriptor class from which to obtain lineage
- * @return ancestors from the progenitor (excluding Object) down the line, plus the given class itself
- */
- private List<ClassDescriptor> lineage(ClassDescriptor descriptor) {
- List<ClassDescriptor> descriptors = new ArrayList<>();
-
- ClassDescriptor currentDesc = descriptor;
- while (currentDesc != null) {
- descriptors.add(currentDesc);
- currentDesc = currentDesc.superClassDescriptor();
- }
-
- Collections.reverse(descriptors);
-
- return descriptors;
- }
-
- private void writeStructuredObjectLayer(Object object, ClassDescriptor layer, DataOutputStream output, MarshallingContext context)
+ private void writeStructuredObjectLayer(Object object, ClassDescriptor layer, IgniteDataOutput output, MarshallingContext context)
throws IOException, MarshalException {
if (layer.hasWriteObject()) {
writeWithWriteObject(object, layer, output, context);
@@ -106,7 +88,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
context.addUsedDescriptor(layer);
}
- private void writeWithWriteObject(Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+ private void writeWithWriteObject(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
throws IOException, MarshalException {
// Do not close the stream yet!
UosObjectOutputStream oos = context.objectOutputStream(output, valueWriter, unsharedWriter, this);
@@ -127,7 +109,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
/** {@inheritDoc} */
@Override
- public void defaultWriteFields(Object object, ClassDescriptor descriptor, DataOutputStream output, MarshallingContext context)
+ public void defaultWriteFields(Object object, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
throws MarshalException, IOException {
@Nullable BitSet nullsBitSet = writeNullsBitSet(object, descriptor, output);
@@ -149,7 +131,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
/** {@inheritDoc} */
@Override
@Nullable
- public BitSet writeNullsBitSet(Object object, ClassDescriptor descriptor, DataOutputStream output) throws IOException {
+ public BitSet writeNullsBitSet(Object object, ClassDescriptor descriptor, DataOutput output) throws IOException {
BitSet nullsBitSet = descriptor.fieldIndexInNullsBitmapSize() == 0 ? null : new BitSet(descriptor.fieldIndexInNullsBitmapSize());
for (FieldDescriptor fieldDescriptor : descriptor.fields()) {
@@ -170,7 +152,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
return nullsBitSet;
}
- private void writeField(Object object, FieldDescriptor fieldDescriptor, DataOutputStream output, MarshallingContext context)
+ private void writeField(Object object, FieldDescriptor fieldDescriptor, IgniteDataOutput output, MarshallingContext context)
throws MarshalException, IOException {
if (fieldDescriptor.clazz().isPrimitive()) {
writePrimitiveFieldValue(object, fieldDescriptor, output);
@@ -186,7 +168,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
return fieldDescriptor.accessor().getObject(object);
}
- private void writePrimitiveFieldValue(Object object, FieldDescriptor fieldDescriptor, DataOutputStream output) throws IOException {
+ private void writePrimitiveFieldValue(Object object, FieldDescriptor fieldDescriptor, IgniteDataOutput output) throws IOException {
FieldAccessor fieldAccessor = fieldDescriptor.accessor();
switch (fieldDescriptor.typeDescriptorId()) {
@@ -227,14 +209,17 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
}
}
- void fillStructuredObjectFrom(DataInputStream input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
+ void fillStructuredObjectFrom(IgniteDataInput input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
throws IOException, UnmarshalException {
- for (ClassDescriptor layer : lineage(descriptor)) {
+ List<ClassDescriptor> lineage = descriptor.lineage();
+
+ // using a for loop to avoid allocation of an iterator
+ for (ClassDescriptor layer : lineage) {
fillStructuredObjectLayerFrom(input, layer, object, context);
}
}
- private void fillStructuredObjectLayerFrom(DataInputStream input, ClassDescriptor layer, Object object, UnmarshallingContext context)
+ private void fillStructuredObjectLayerFrom(IgniteDataInput input, ClassDescriptor layer, Object object, UnmarshallingContext context)
throws IOException, UnmarshalException {
if (layer.hasReadObject()) {
fillObjectWithReadObjectFrom(input, object, layer, context);
@@ -244,7 +229,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
}
private void fillObjectWithReadObjectFrom(
- DataInputStream input,
+ IgniteDataInput input,
Object object,
ClassDescriptor descriptor,
UnmarshallingContext context
@@ -267,7 +252,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
/** {@inheritDoc} */
@Override
- public void defaultFillFieldsFrom(DataInputStream input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
+ public void defaultFillFieldsFrom(IgniteDataInput input, Object object, ClassDescriptor descriptor, UnmarshallingContext context)
throws IOException, UnmarshalException {
@Nullable BitSet nullsBitSet = readNullsBitSet(input, descriptor);
@@ -299,7 +284,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
return ProtocolMarshalling.readFixedLengthBitSet(descriptor.fieldIndexInNullsBitmapSize(), input);
}
- private void fillFieldFrom(DataInputStream input, Object object, UnmarshallingContext context, FieldDescriptor fieldDescriptor)
+ private void fillFieldFrom(IgniteDataInput input, Object object, UnmarshallingContext context, FieldDescriptor fieldDescriptor)
throws IOException, UnmarshalException {
if (fieldDescriptor.clazz().isPrimitive()) {
fillPrimitiveFieldFrom(input, object, fieldDescriptor);
@@ -313,7 +298,7 @@ class StructuredObjectMarshaller implements DefaultFieldsReaderWriter {
fieldDescriptor.accessor().setObject(object, fieldValue);
}
- private void fillPrimitiveFieldFrom(DataInputStream input, Object object, FieldDescriptor fieldDescriptor) throws IOException {
+ private void fillPrimitiveFieldFrom(DataInput input, Object object, FieldDescriptor fieldDescriptor) throws IOException {
FieldAccessor fieldAccessor = fieldDescriptor.accessor();
switch (fieldDescriptor.typeDescriptorId()) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueReader.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueReader.java
index cc5894a..2224b3e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueReader.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueReader.java
@@ -17,17 +17,17 @@
package org.apache.ignite.internal.network.serialization.marshal;
-import java.io.DataInputStream;
import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.jetbrains.annotations.Nullable;
/**
- * Knows how to read a value from a {@link DataInputStream} and, optionally, knows the expected type (might be taken from
+ * Knows how to read a value from an input and, optionally, knows the expected type (might be taken from
* a field or be an array component type).
*/
interface TypedValueReader {
/**
- * Reads the next value from a {@link DataInputStream}.
+ * Reads the next value from an input.
*
* @param input from where to read
* @param declaredClass the original class of the object (i.e. {@code byte.class} for {@code byte}); might be {@code null}
@@ -37,6 +37,6 @@ interface TypedValueReader {
* @throws IOException if an I/O problem occurs
* @throws UnmarshalException if another problem (like {@link ClassNotFoundException}) occurs
*/
- Object read(DataInputStream input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
+ Object read(IgniteDataInput input, @Nullable Class<?> declaredClass, UnmarshallingContext context)
throws IOException, UnmarshalException;
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueWriter.java
index 4c89911..69f3a0d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/TypedValueWriter.java
@@ -17,17 +17,17 @@
package org.apache.ignite.internal.network.serialization.marshal;
-import java.io.DataOutputStream;
import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.jetbrains.annotations.Nullable;
/**
- * Writes objects to a {@link DataOutputStream} taking their declared types into consideration (if any).
+ * Writes objects to an output taking their declared types into consideration (if any).
* The type might come from a field or be an array component type.
*/
interface TypedValueWriter {
/**
- * Writes the given object to the {@link DataOutputStream}.
+ * Writes the given object to the output.
*
* @param object object to write
* @param declaredClass the original class of the object (i.e. {@code byte.class} for {@code byte}); might be {@code null}
@@ -37,6 +37,6 @@ interface TypedValueWriter {
* @throws IOException if an I/O problem occurs
* @throws MarshalException if another problem occurs
*/
- void write(Object object, @Nullable Class<?> declaredClass, DataOutputStream output, MarshallingContext context)
+ void write(Object object, @Nullable Class<?> declaredClass, IgniteDataOutput output, MarshallingContext context)
throws IOException, MarshalException;
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UnmarshallingContext.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UnmarshallingContext.java
index ffa9f8e..9ac6e6f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UnmarshallingContext.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UnmarshallingContext.java
@@ -17,27 +17,26 @@
package org.apache.ignite.internal.network.serialization.marshal;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.io.NotActiveException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
import org.apache.ignite.internal.network.serialization.DescriptorRegistry;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.jetbrains.annotations.Nullable;
/**
* Context of unmarshalling act. Created once per unmarshalling a root object.
*/
class UnmarshallingContext implements DescriptorRegistry {
- private final ByteArrayInputStream source;
+ private final IgniteDataInput source;
private final DescriptorRegistry descriptors;
private final ClassLoader classLoader;
- private final Map<Integer, Object> idsToObjects = new HashMap<>();
+ private final Int2ObjectMap<Object> idsToObjects = new Int2ObjectOpenHashMap<>();
private final IntSet unsharedObjectIds = new IntOpenHashSet();
private Object objectCurrentlyReadWithReadObject;
@@ -45,7 +44,7 @@ class UnmarshallingContext implements DescriptorRegistry {
private UosObjectInputStream objectInputStream;
- public UnmarshallingContext(ByteArrayInputStream source, DescriptorRegistry descriptors, ClassLoader classLoader) {
+ public UnmarshallingContext(IgniteDataInput source, DescriptorRegistry descriptors, ClassLoader classLoader) {
this.source = source;
this.descriptors = descriptors;
this.classLoader = classLoader;
@@ -113,7 +112,7 @@ class UnmarshallingContext implements DescriptorRegistry {
source.mark(readAheadLimit);
}
- public void resetSourceToMark() {
+ public void resetSourceToMark() throws IOException {
source.reset();
}
@@ -144,7 +143,7 @@ class UnmarshallingContext implements DescriptorRegistry {
}
UosObjectInputStream objectInputStream(
- DataInputStream input,
+ IgniteDataInput input,
TypedValueReader valueReader,
TypedValueReader unsharedReader,
DefaultFieldsReaderWriter defaultFieldsReaderWriter
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosIgniteOutputStream.java
similarity index 60%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosIgniteOutputStream.java
index f7b2ede..dd8f8b8 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosIgniteOutputStream.java
@@ -17,21 +17,27 @@
package org.apache.ignite.internal.network.serialization.marshal;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
/**
- * Knows how to write a value to a {@link DataOutputStream}.
+ * {@link IgniteUnsafeDataOutput} extension that allows to track whether it's occupied or not.
*/
-interface ValueWriter<T> {
- /**
- * Writes the given value to a {@link DataOutputStream}.
- *
- * @param value value to write
- * @param output where to write to
- * @param context marshalling context
- * @throws IOException if an I/O problem occurs
- * @throws MarshalException if another problem occurs
- */
- void write(T value, DataOutputStream output, MarshallingContext context) throws IOException, MarshalException;
+class UosIgniteOutputStream extends IgniteUnsafeDataOutput {
+ private boolean occupied = false;
+
+ public UosIgniteOutputStream(int size) {
+ super(size);
+ }
+
+ boolean isOccupied() {
+ return occupied;
+ }
+
+ void occupy() {
+ occupied = true;
+ }
+
+ void release() {
+ occupied = false;
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectInputStream.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectInputStream.java
index 3b89e7d..42eb272 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectInputStream.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectInputStream.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.network.serialization.marshal;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;
@@ -26,13 +25,14 @@ import java.util.BitSet;
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
import org.apache.ignite.internal.network.serialization.FieldDescriptor;
import org.apache.ignite.internal.network.serialization.Primitives;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.jetbrains.annotations.Nullable;
/**
* {@link ObjectInputStream} specialization used by User Object Serialization.
*/
class UosObjectInputStream extends ObjectInputStream {
- private final DataInputStream input;
+ private final IgniteDataInput input;
private final TypedValueReader valueReader;
private final TypedValueReader unsharedReader;
private final DefaultFieldsReaderWriter defaultFieldsReaderWriter;
@@ -41,7 +41,7 @@ class UosObjectInputStream extends ObjectInputStream {
private UosGetField currentGet;
UosObjectInputStream(
- DataInputStream input,
+ IgniteDataInput input,
TypedValueReader valueReader,
TypedValueReader unsharedReader,
DefaultFieldsReaderWriter defaultFieldsReaderWriter,
@@ -73,6 +73,11 @@ class UosObjectInputStream extends ObjectInputStream {
return input.read(buf, off, len);
}
+ @Override
+ public byte[] readAllBytes() throws IOException {
+ return super.readAllBytes();
+ }
+
/** {@inheritDoc} */
@Override
public byte readByte() throws IOException {
@@ -279,7 +284,7 @@ class UosObjectInputStream extends ObjectInputStream {
/** {@inheritDoc} */
@Override
public boolean get(String name, boolean val) throws IOException {
- return Bits.getBoolean(primitiveFieldsData, primitiveFieldDataOffset(name, boolean.class));
+ return LittleEndianBits.getBoolean(primitiveFieldsData, primitiveFieldDataOffset(name, boolean.class));
}
/** {@inheritDoc} */
@@ -291,37 +296,37 @@ class UosObjectInputStream extends ObjectInputStream {
/** {@inheritDoc} */
@Override
public char get(String name, char val) throws IOException {
- return Bits.getChar(primitiveFieldsData, primitiveFieldDataOffset(name, char.class));
+ return LittleEndianBits.getChar(primitiveFieldsData, primitiveFieldDataOffset(name, char.class));
}
/** {@inheritDoc} */
@Override
public short get(String name, short val) throws IOException {
- return Bits.getShort(primitiveFieldsData, primitiveFieldDataOffset(name, short.class));
+ return LittleEndianBits.getShort(primitiveFieldsData, primitiveFieldDataOffset(name, short.class));
}
/** {@inheritDoc} */
@Override
public int get(String name, int val) throws IOException {
- return Bits.getInt(primitiveFieldsData, primitiveFieldDataOffset(name, int.class));
+ return LittleEndianBits.getInt(primitiveFieldsData, primitiveFieldDataOffset(name, int.class));
}
/** {@inheritDoc} */
@Override
public long get(String name, long val) throws IOException {
- return Bits.getLong(primitiveFieldsData, primitiveFieldDataOffset(name, long.class));
+ return LittleEndianBits.getLong(primitiveFieldsData, primitiveFieldDataOffset(name, long.class));
}
/** {@inheritDoc} */
@Override
public float get(String name, float val) throws IOException {
- return Bits.getFloat(primitiveFieldsData, primitiveFieldDataOffset(name, float.class));
+ return LittleEndianBits.getFloat(primitiveFieldsData, primitiveFieldDataOffset(name, float.class));
}
/** {@inheritDoc} */
@Override
public double get(String name, double val) throws IOException {
- return Bits.getDouble(primitiveFieldsData, primitiveFieldDataOffset(name, double.class));
+ return LittleEndianBits.getDouble(primitiveFieldsData, primitiveFieldDataOffset(name, double.class));
}
/** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectOutputStream.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectOutputStream.java
index 79dde97..d4ae13d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectOutputStream.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/UosObjectOutputStream.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.network.serialization.marshal;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.NotActiveException;
import java.io.ObjectOutput;
@@ -26,13 +25,14 @@ import java.util.BitSet;
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
import org.apache.ignite.internal.network.serialization.FieldDescriptor;
import org.apache.ignite.internal.network.serialization.Primitives;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.jetbrains.annotations.Nullable;
/**
* {@link ObjectOutputStream} specialization used by User Object Serialization.
*/
class UosObjectOutputStream extends ObjectOutputStream {
- private final DataOutputStream output;
+ private final IgniteDataOutput output;
private final TypedValueWriter valueWriter;
private final TypedValueWriter unsharedWriter;
private final DefaultFieldsReaderWriter defaultFieldsReaderWriter;
@@ -41,7 +41,7 @@ class UosObjectOutputStream extends ObjectOutputStream {
private UosPutField currentPut;
UosObjectOutputStream(
- DataOutputStream output,
+ IgniteDataOutput output,
TypedValueWriter valueWriter,
TypedValueWriter unsharedWriter, DefaultFieldsReaderWriter defaultFieldsReaderWriter,
MarshallingContext context
@@ -253,7 +253,7 @@ class UosObjectOutputStream extends ObjectOutputStream {
/** {@inheritDoc} */
@Override
public void put(String name, boolean val) {
- Bits.putBoolean(primitiveFieldsData, primitiveFieldDataOffset(name, boolean.class), val);
+ LittleEndianBits.putBoolean(primitiveFieldsData, primitiveFieldDataOffset(name, boolean.class), val);
}
/** {@inheritDoc} */
@@ -265,37 +265,37 @@ class UosObjectOutputStream extends ObjectOutputStream {
/** {@inheritDoc} */
@Override
public void put(String name, char val) {
- Bits.putChar(primitiveFieldsData, primitiveFieldDataOffset(name, char.class), val);
+ LittleEndianBits.putChar(primitiveFieldsData, primitiveFieldDataOffset(name, char.class), val);
}
/** {@inheritDoc} */
@Override
public void put(String name, short val) {
- Bits.putShort(primitiveFieldsData, primitiveFieldDataOffset(name, short.class), val);
+ LittleEndianBits.putShort(primitiveFieldsData, primitiveFieldDataOffset(name, short.class), val);
}
/** {@inheritDoc} */
@Override
public void put(String name, int val) {
- Bits.putInt(primitiveFieldsData, primitiveFieldDataOffset(name, int.class), val);
+ LittleEndianBits.putInt(primitiveFieldsData, primitiveFieldDataOffset(name, int.class), val);
}
/** {@inheritDoc} */
@Override
public void put(String name, long val) {
- Bits.putLong(primitiveFieldsData, primitiveFieldDataOffset(name, long.class), val);
+ LittleEndianBits.putLong(primitiveFieldsData, primitiveFieldDataOffset(name, long.class), val);
}
/** {@inheritDoc} */
@Override
public void put(String name, float val) {
- Bits.putFloat(primitiveFieldsData, primitiveFieldDataOffset(name, float.class), val);
+ LittleEndianBits.putFloat(primitiveFieldsData, primitiveFieldDataOffset(name, float.class), val);
}
/** {@inheritDoc} */
@Override
public void put(String name, double val) {
- Bits.putDouble(primitiveFieldsData, primitiveFieldDataOffset(name, double.class), val);
+ LittleEndianBits.putDouble(primitiveFieldsData, primitiveFieldDataOffset(name, double.class), val);
}
/** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueReader.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueReader.java
index 1ab25a1..240a575 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueReader.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueReader.java
@@ -17,15 +17,15 @@
package org.apache.ignite.internal.network.serialization.marshal;
-import java.io.DataInputStream;
import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
/**
- * Knows how to read a value from a {@link DataInputStream}.
+ * Knows how to read a value from an input.
*/
interface ValueReader<T> {
/**
- * Reads the next value from a {@link DataInputStream}.
+ * Reads the next value from an input.
*
* @param input from where to read
* @param context unmarshalling context
@@ -33,5 +33,5 @@ interface ValueReader<T> {
* @throws IOException if an I/O problem occurs
* @throws UnmarshalException if another problem (like {@link ClassNotFoundException}) occurs
*/
- T read(DataInputStream input, UnmarshallingContext context) throws IOException, UnmarshalException;
+ T read(IgniteDataInput input, UnmarshallingContext context) throws IOException, UnmarshalException;
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
index f7b2ede..a4ce6ee 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/marshal/ValueWriter.java
@@ -17,15 +17,15 @@
package org.apache.ignite.internal.network.serialization.marshal;
-import java.io.DataOutputStream;
import java.io.IOException;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
/**
- * Knows how to write a value to a {@link DataOutputStream}.
+ * Knows how to write a value to an output.
*/
interface ValueWriter<T> {
/**
- * Writes the given value to a {@link DataOutputStream}.
+ * Writes the given value to an output.
*
* @param value value to write
* @param output where to write to
@@ -33,5 +33,5 @@ interface ValueWriter<T> {
* @throws IOException if an I/O problem occurs
* @throws MarshalException if another problem occurs
*/
- void write(T value, DataOutputStream output, MarshallingContext context) throws IOException, MarshalException;
+ void write(T value, IgniteDataOutput output, MarshallingContext context) throws IOException, MarshalException;
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessage.java b/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessage.java
index a8d0504..4a190e0 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessage.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.network;
+import java.io.Serializable;
import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
@@ -30,7 +31,7 @@ import org.apache.ignite.network.annotations.Transferable;
* {@link NetworkMessage} implementation.
*/
@Transferable(TestMessageTypes.ALL_TYPES)
-public interface AllTypesMessage extends NetworkMessage {
+public interface AllTypesMessage extends NetworkMessage, Serializable {
byte byteA();
short shortB();
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessageGenerator.java b/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessageGenerator.java
index 883a051..43692e7 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessageGenerator.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/AllTypesMessageGenerator.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.network;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
import java.lang.reflect.Field;
import java.util.BitSet;
import java.util.Random;
import java.util.UUID;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lang.IgniteUuidGenerator;
@@ -33,6 +35,10 @@ import org.jetbrains.annotations.Nullable;
* Generator for an {@link AllTypesMessage}.
*/
public class AllTypesMessageGenerator {
+ public static AllTypesMessage generate(long seed, boolean nestedMsg) {
+ return generate(seed, nestedMsg, true);
+ }
+
/**
* Generate a new {@link AllTypesMessage}.
*
@@ -40,7 +46,7 @@ public class AllTypesMessageGenerator {
* @param nestedMsg {@code true} if nested messages should be generated, {@code false} otherwise.
* @return Message.
*/
- public static AllTypesMessage generate(long seed, boolean nestedMsg) {
+ public static AllTypesMessage generate(long seed, boolean nestedMsg, boolean fillArrays) {
try {
var random = new Random(seed);
@@ -51,21 +57,23 @@ public class AllTypesMessageGenerator {
for (Field field : fields) {
field.setAccessible(true);
- field.set(message, randomValue(random, field, nestedMsg));
+ if (!field.getType().isArray() || fillArrays) {
+ field.set(message, randomValue(random, field, nestedMsg));
+ }
}
if (nestedMsg) {
message.netMsgArrV(
- IntStream.range(0, 10).mapToObj(unused -> generate(seed, false)).toArray(NetworkMessage[]::new)
+ IntStream.range(0, 10).mapToObj(unused -> generate(seed, false, fillArrays)).toArray(NetworkMessage[]::new)
);
message.netMsgCollW(IntStream.range(0, 10)
- .mapToObj(unused -> generate(seed, false))
- .collect(Collectors.toList()));
+ .mapToObj(unused -> generate(seed, false, fillArrays))
+ .collect(toList()));
message.newMsgMapX(IntStream.range(0, 10)
.boxed()
- .collect(Collectors.toMap(String::valueOf, unused -> generate(seed, false))));
+ .collect(toMap(String::valueOf, unused -> generate(seed, false, fillArrays))));
}
return message.build();
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/SerializationMicroBenchmark.java b/modules/network/src/test/java/org/apache/ignite/internal/network/SerializationMicroBenchmark.java
new file mode 100644
index 0000000..c4df17a
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/SerializationMicroBenchmark.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalException;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
+import org.apache.ignite.internal.network.serialization.marshal.UnmarshalException;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
+import org.jetbrains.annotations.Nullable;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * A micro-benchmark of {@link DefaultUserObjectMarshaller}.
+ */
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(3)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@State(Scope.Thread)
+public class SerializationMicroBenchmark {
+
+ static TestClass smallObject;
+ static AllTypesMessage mediumObject;
+ static AllTypesMessage largeObject;
+
+ static ClassDescriptorRegistry registry;
+
+ static UserObjectMarshaller userObjectMarshaller;
+
+ static Kryo kryo;
+
+ static byte[] smallSerializedWithUos;
+ static byte[] smallSerializedWithJava;
+ static byte[] smallSerializedWithKryo;
+
+ static byte[] mediumSerializedWithUos;
+ static byte[] mediumSerializedWithJava;
+ static byte[] mediumSerializedWithKryo;
+
+ static byte[] largeSerializedWithUos;
+ static byte[] largeSerializedWithJava;
+ static byte[] largeSerializedWithKryo;
+
+ static {
+ registry = new ClassDescriptorRegistry();
+ var factory = new ClassDescriptorFactory(registry);
+ userObjectMarshaller = new DefaultUserObjectMarshaller(registry, factory);
+
+ smallObject = new TestClass(1000, false, 3000);
+ mediumObject = AllTypesMessageGenerator.generate(10, true, false);
+ largeObject = AllTypesMessageGenerator.generate(10, true, true);
+
+ factory.create(smallObject.getClass());
+ factory.create(largeObject.getClass());
+
+ kryo = new Kryo();
+ kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+
+ try {
+ smallSerializedWithUos = userObjectMarshaller.marshal(smallObject).bytes();
+ mediumSerializedWithUos = userObjectMarshaller.marshal(mediumObject).bytes();
+ largeSerializedWithUos = userObjectMarshaller.marshal(largeObject).bytes();
+ } catch (MarshalException e) {
+ throw new RuntimeException(e);
+ }
+
+ smallSerializedWithJava = serializeWithJdk(smallObject);
+ mediumSerializedWithJava = serializeWithJdk(mediumObject);
+ largeSerializedWithJava = serializeWithJdk(largeObject);
+
+ smallSerializedWithKryo = serializeWithKryo(smallObject);
+ mediumSerializedWithKryo = serializeWithKryo(mediumObject);
+ largeSerializedWithKryo = serializeWithKryo(largeObject);
+
+ System.out.println("Small Java: " + smallSerializedWithJava.length);
+ System.out.println("Small UOS : " + smallSerializedWithUos.length);
+ System.out.println("Small Kryo: " + smallSerializedWithKryo.length);
+
+ System.out.println("Medium Java: " + mediumSerializedWithJava.length);
+ System.out.println("Medium UOS : " + mediumSerializedWithUos.length);
+ System.out.println("Medium Kryo: " + mediumSerializedWithKryo.length);
+
+ System.out.println("Large Java: " + largeSerializedWithJava.length);
+ System.out.println("Large UOS : " + largeSerializedWithUos.length);
+ System.out.println("Large Kryo: " + largeSerializedWithKryo.length);
+ }
+
+ /**
+ * Runs the benchmark.
+ *
+ * @param args args
+ * @throws Exception if something goes wrong
+ */
+ public static void main(String[] args) throws Exception {
+ Options build = new OptionsBuilder()
+ //.addProfiler("gc")
+ .include(SerializationMicroBenchmark.class.getName() + ".*").build();
+
+ new Runner(build).run();
+ }
+
+ @Benchmark
+ public byte[] serialization_01_small_jdk() {
+ return serializeWithJdk(smallObject);
+ }
+
+ @Benchmark
+ public byte[] serialization_11_medium_jdk() {
+ return serializeWithJdk(mediumObject);
+ }
+
+ @Benchmark
+ public byte[] serialization_21_large_jdk() {
+ return serializeWithJdk(largeObject);
+ }
+
+ @Benchmark
+ public void serialization_02_small_uos(Blackhole blackhole) {
+ benchmarkUosSerialization(smallObject, blackhole);
+ }
+
+ @Benchmark
+ public void serialization_12_medium_uos(Blackhole blackhole) {
+ benchmarkUosSerialization(mediumObject, blackhole);
+ }
+
+ @Benchmark
+ public void serialization_22_large_uos(Blackhole blackhole) {
+ benchmarkUosSerialization(largeObject, blackhole);
+ }
+
+ private void benchmarkUosSerialization(Object obj, Blackhole blackhole) {
+ try {
+ MarshalledObject marshal = userObjectMarshaller.marshal(obj);
+
+ blackhole.consume(marshal);
+ blackhole.consume(marshal.bytes());
+ } catch (MarshalException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Benchmark
+ public byte[] serialization_03_small_kryo() {
+ return serializeWithKryo(smallObject);
+ }
+
+ @Benchmark
+ public byte[] serialization_13_medium_kryo() {
+ return serializeWithKryo(mediumObject);
+ }
+
+ @Benchmark
+ public byte[] serialization_23_large_kryo() {
+ return serializeWithKryo(largeObject);
+ }
+
+ @Benchmark
+ public Object deserialization_01_small_jdk() {
+ return deserializeWithJava(SerializationMicroBenchmark.smallSerializedWithJava);
+ }
+
+ @Benchmark
+ public Object deserialization_11_medium_jdk() {
+ return deserializeWithJava(SerializationMicroBenchmark.mediumSerializedWithJava);
+ }
+
+ @Benchmark
+ public Object deserialization_21_large_jdk() {
+ return deserializeWithJava(SerializationMicroBenchmark.largeSerializedWithJava);
+ }
+
+ private Object deserializeWithJava(byte[] serialized) {
+ try (var bis = new ByteArrayInputStream(serialized); var ois = new ObjectInputStream(bis)) {
+ return ois.readObject();
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Benchmark
+ public Object deserialization_02_small_uos() {
+ return deserializeWithUos(smallSerializedWithUos);
+ }
+
+ @Benchmark
+ public Object deserialization_12_medium_uos() {
+ return deserializeWithUos(mediumSerializedWithUos);
+ }
+
+ @Benchmark
+ public Object deserialization_22_large_uos() {
+ return deserializeWithUos(largeSerializedWithUos);
+ }
+
+ @Nullable
+ private Object deserializeWithUos(byte[] serialized) {
+ try {
+ return userObjectMarshaller.unmarshal(serialized, registry);
+ } catch (UnmarshalException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Benchmark
+ public Object deserialization_03_small_kryo() {
+ return deserializeWithKryo(smallSerializedWithKryo);
+ }
+
+ @Benchmark
+ public Object deserialization_13_medium_kryo() {
+ return deserializeWithKryo(mediumSerializedWithKryo);
+ }
+
+ @Benchmark
+ public Object deserialization_23_large_kryo() {
+ return deserializeWithKryo(largeSerializedWithKryo);
+ }
+
+ private Object deserializeWithKryo(byte[] serialized) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(serialized);
+ try (Input input = new Input(stream)) {
+ return kryo.readClassAndObject(input);
+ }
+ }
+
+ private static byte[] serializeWithJdk(Object obj) {
+ try (var bos = new ByteArrayOutputStream(); var oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(obj);
+
+ return bos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static byte[] serializeWithKryo(Object obj) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try (Output output = new Output(stream)) {
+ kryo.writeClassAndObject(output, obj);
+ output.close();
+
+ return stream.toByteArray();
+ }
+ }
+
+ static class TestClass implements Serializable {
+
+ private int foo;
+
+ private boolean bar;
+
+ private double foobar;
+
+ public TestClass() {
+ }
+
+ public TestClass(int foo, boolean bar, double foobar) {
+ this.foo = foo;
+ this.bar = bar;
+ this.foobar = foobar;
+ }
+ }
+
+}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/UosProfilerTarget.java b/modules/network/src/test/java/org/apache/ignite/internal/network/UosProfilerTarget.java
new file mode 100644
index 0000000..ce60d6d
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/UosProfilerTarget.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import java.util.Arrays;
+import java.util.Objects;
+import org.apache.ignite.internal.network.SerializationMicroBenchmark.TestClass;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalException;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
+
+/**
+ * For running with a profiler.
+ */
+public class UosProfilerTarget {
+
+ static Object obj;
+
+ static final ClassDescriptorRegistry registry = new ClassDescriptorRegistry();
+ static UserObjectMarshaller userObjectMarshaller;
+
+ static Class<?> clz;
+
+ static byte[] marshalledByUos;
+
+ static {
+ var factory = new ClassDescriptorFactory(registry);
+ userObjectMarshaller = new DefaultUserObjectMarshaller(registry, factory);
+ // obj = AllTypesMessageGenerator.generate(10, true, true);
+ obj = AllTypesMessageGenerator.generate(10, true, false);
+ // obj = new TestClass(1000, false, 3000);
+ clz = obj.getClass();
+ factory.create(clz);
+ try {
+ MarshalledObject marshal = userObjectMarshaller.marshal(obj);
+ marshalledByUos = marshal.bytes();
+ } catch (MarshalException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ //serialize();
+ deserialize();
+ }
+
+ private static void serialize() throws Exception {
+ int accumulatedCode = 1;
+ int count = (obj instanceof TestClass) ? 30_000_000 : 100_000;
+ for (int i = 0; i < count; i++) {
+ byte[] bytes = doMarshal(UosProfilerTarget.obj);
+ int code = Arrays.hashCode(bytes);
+ accumulatedCode ^= code;
+ }
+
+ System.out.println(accumulatedCode);
+ }
+
+ private static byte[] doMarshal(Object obj1) throws Exception {
+ MarshalledObject marshal = userObjectMarshaller.marshal(obj1);
+ return marshal.bytes();
+ }
+
+ private static void deserialize() throws Exception {
+ int accumulatedCode = 1;
+ int count = (obj instanceof TestClass) ? 30_000_000 : 100_000;
+ for (int i = 0; i < count; i++) {
+ Object object = doUnmarshal(marshalledByUos);
+ int code = Objects.hashCode(object);
+ accumulatedCode ^= code;
+ }
+
+ System.out.println(accumulatedCode);
+ }
+
+ private static Object doUnmarshal(byte[] bytes) throws Exception {
+ return userObjectMarshaller.unmarshal(bytes, registry);
+ }
+}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/BuiltInDescriptorsTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/BuiltInDescriptorsTest.java
index 9d5bd97..1ca38db 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/BuiltInDescriptorsTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/BuiltInDescriptorsTest.java
@@ -58,6 +58,7 @@ import static org.apache.ignite.internal.network.serialization.BuiltInType.SHORT
import static org.apache.ignite.internal.network.serialization.BuiltInType.SHORT_BOXED;
import static org.apache.ignite.internal.network.serialization.BuiltInType.SINGLETON_LIST;
import static org.apache.ignite.internal.network.serialization.BuiltInType.STRING;
+import static org.apache.ignite.internal.network.serialization.BuiltInType.STRING_LATIN1;
import static org.apache.ignite.internal.network.serialization.BuiltInType.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -114,5 +115,6 @@ public class BuiltInDescriptorsTest {
assertEquals(39, REFERENCE.descriptorId());
assertEquals(40, CLASS.descriptorId());
assertEquals(41, PROXY.descriptorId());
+ assertEquals(42, STRING_LATIN1.descriptorId());
}
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/ClassesTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/ClassesTest.java
index bdeb7f1..fc9878a 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/ClassesTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/ClassesTest.java
@@ -202,6 +202,16 @@ class ClassesTest {
assertFalse(Classes.isRuntimeTypeKnownUpfront(Enum[].class));
}
+ @Test
+ void isValueTypeKnownUpfrontReturnsFalseForString() {
+ assertFalse(Classes.isRuntimeTypeKnownUpfront(String.class));
+ }
+
+ @Test
+ void isValueTypeKnownUpfrontReturnsFalseForArrayOfString() {
+ assertFalse(Classes.isRuntimeTypeKnownUpfront(String[].class));
+ }
+
private static class EmptySerializable implements Serializable {
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
index b3e403c..30c64ab 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
@@ -30,13 +30,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
+import it.unimi.dsi.fastutil.ints.IntSets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Map;
import org.apache.ignite.internal.network.direct.DirectMessageWriter;
import org.apache.ignite.internal.network.netty.ConnectionManager;
@@ -192,7 +192,7 @@ public class MarshallableTest {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(object);
oos.close();
- return new MarshalledObject(baos.toByteArray(), Collections.singleton(descriptor));
+ return new MarshalledObject(baos.toByteArray(), IntSets.singleton(descriptor.descriptorId()));
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerCommonTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerCommonTest.java
index 3e2e036..a2254d7 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerCommonTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerCommonTest.java
@@ -18,12 +18,20 @@
package org.apache.ignite.internal.network.serialization.marshal;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
import java.util.Arrays;
+import java.util.List;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.DescriptorRegistry;
import org.junit.jupiter.api.Test;
/**
@@ -35,6 +43,9 @@ class DefaultUserObjectMarshallerCommonTest {
private final DefaultUserObjectMarshaller marshaller = new DefaultUserObjectMarshaller(descriptorRegistry, descriptorFactory);
+ static DefaultUserObjectMarshaller staticMarshaller;
+ static DescriptorRegistry staticRegistry;
+
@Test
void throwsOnExcessiveInputWhenUnmarshalling() throws Exception {
MarshalledObject marshalled = marshaller.marshal(null);
@@ -45,4 +56,67 @@ class DefaultUserObjectMarshallerCommonTest {
assertThat(ex.getMessage(), is("After reading a value, 1 excessive byte(s) still remain"));
}
+ @Test
+ void previousInvocationsOfMarshallingInSameThreadDoNotHaveVisibleEffectsOnFollowingInvocations() throws Exception {
+ List<Integer> list = List.of(1, 2, 3);
+
+ MarshalledObject firstResult = marshaller.marshal(list);
+
+ MarshalledObject secondResult = marshaller.marshal(list);
+
+ assertThat(marshaller.unmarshal(secondResult.bytes(), descriptorRegistry), is(equalTo(list)));
+ assertThat(secondResult.bytes(), is(equalTo(firstResult.bytes())));
+ }
+
+ @Test
+ void nestedMarshallingUnmarshallingInsideWriteReadObjectDoNotInterfereWithOutsideMarshallingUnmarshalling() throws Exception {
+ staticMarshaller = marshaller;
+ staticRegistry = descriptorRegistry;
+
+ WithNestedMarshalling unmarshalled = marshalAndUnmarshalNotNull(new WithNestedMarshalling(42));
+
+ assertThat(unmarshalled.value, is(42));
+ }
+
+ private <T> T marshalAndUnmarshalNotNull(Object object) throws MarshalException, UnmarshalException {
+ MarshalledObject marshalled = marshaller.marshal(object);
+ T unmarshalled = marshaller.unmarshal(marshalled.bytes(), descriptorRegistry);
+
+ assertThat(unmarshalled, is(notNullValue()));
+
+ return unmarshalled;
+ }
+
+ private static class WithNestedMarshalling implements Serializable {
+ private final int value;
+
+ private WithNestedMarshalling(int value) {
+ this.value = value;
+ }
+
+ private void writeObject(ObjectOutputStream stream) throws IOException {
+ marshalSomethingElse();
+
+ stream.defaultWriteObject();
+ }
+
+ private byte[] marshalSomethingElse() {
+ try {
+ return staticMarshaller.marshal(List.of(1, 2, 3)).bytes();
+ } catch (MarshalException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
+ byte[] marshalledBytes = marshalSomethingElse();
+ try {
+ staticMarshaller.unmarshal(marshalledBytes, staticRegistry);
+ } catch (UnmarshalException e) {
+ throw new RuntimeException(e);
+ }
+
+ stream.defaultReadObject();;
+ }
+ }
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest.java
index 6bd22e9..94ca19d 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest.java
@@ -23,8 +23,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
+import java.io.DataInput;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -32,7 +31,8 @@ import java.io.Serializable;
import org.apache.ignite.internal.network.serialization.BuiltInType;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
import org.junit.jupiter.api.Test;
class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
@@ -48,23 +48,22 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesOnlyThePrimitiveValueForPrimitiveFields() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new WithPrimitiveField((byte) 123));
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
assertThat(dis.readAllBytes(), is(new byte[]{123}));
}
- private DataInputStream openDataStreamAndSkipRootObjectHeader(MarshalledObject marshalled) throws IOException {
- DataInputStream dis = openDataStream(marshalled);
+ private IgniteDataInput openDataStreamAndSkipRootObjectHeader(MarshalledObject marshalled) throws IOException {
+ IgniteDataInput dis = openDataStream(marshalled);
skipTillRootObjectData(dis);
return dis;
}
- @NotNull
- private DataInputStream openDataStream(MarshalledObject marshalled) {
- return new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+ private IgniteDataInput openDataStream(MarshalledObject marshalled) {
+ return new IgniteUnsafeDataInput(marshalled.bytes());
}
- private void skipTillRootObjectData(DataInputStream dos) throws IOException {
+ private void skipTillRootObjectData(DataInput dos) throws IOException {
ProtocolMarshalling.readDescriptorOrCommandId(dos);
ProtocolMarshalling.readObjectId(dos);
}
@@ -73,7 +72,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void treatsPrimitiveArrayFieldsAsFieldsWithTypeKnownUpfront() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new WithPrimitiveArrayField(new byte[]{123}));
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
skipOneByteEmptyNullBitMask(dis);
assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -85,7 +84,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void doesNotWriteNullsBitmapForPrimitiveArrays() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new byte[]{123});
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
assertThat(ProtocolMarshalling.readLength(dis), is(1));
assertThat(dis.readAllBytes(), is(new byte[]{123}));
@@ -95,7 +94,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesNullsBitmapAndNoDescriptorIdForNonNullPrimitiveWrapperFields() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new WithPrimitiveWrapperField((byte) 123));
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
skipOneByteEmptyNullBitMask(dis);
assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -103,7 +102,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
assertThat(dis.readAllBytes(), is(new byte[]{123}));
}
- private void skipOneByteEmptyNullBitMask(DataInputStream dis) throws IOException {
+ private void skipOneByteEmptyNullBitMask(DataInput dis) throws IOException {
assertThat(dis.readByte(), is((byte) 0));
}
@@ -111,7 +110,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesOnlyNullsBitmapForNullPrimitiveWrapperFields() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new WithPrimitiveWrapperField(null));
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
final byte nullsBitmapWithOneBitSet = 1;
assertThat(dis.readAllBytes(), is(new byte[]{nullsBitmapWithOneBitSet}));
@@ -121,7 +120,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesNullsBitmapAndNoDescriptorIdForNonNullValueOfFieldWhichTypeIsKnownUpfront() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new WithFieldOfTypeKnownUpfront(new FinalClass((byte) 123)));
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
skipOneByteEmptyNullBitMask(dis);
assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -133,7 +132,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesOnlyNullsBitmapForNullValueOfFieldWhichTypeIsKnownUpfront() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new WithFieldOfTypeKnownUpfront(null));
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
final byte nullsBitmapWithOneBitSet = (byte) 1;
assertThat(dis.readAllBytes(), is(new byte[]{nullsBitmapWithOneBitSet}));
@@ -143,7 +142,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesDescriptorIdAndObjectIdAndTheValueForNonNullValueOfFieldWhichTypeIsNotKnownUpfront() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new WithFieldOfTypeNotKnownUpfront(new NonFinalClass((byte) 123)));
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
assertThat(ProtocolMarshalling.readDescriptorOrCommandId(dis), greaterThanOrEqualTo(MIN_CUSTOM_DESCRIPTOR_ID));
assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -155,7 +154,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesOnlyNullMarkerForNullValueOfFieldWhichTypeIsNotKnownUpfront() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new WithFieldOfTypeNotKnownUpfront(null));
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
assertThat(dis.readAllBytes(), is(new byte[]{(byte) BuiltInType.NULL.descriptorId()}));
}
@@ -166,7 +165,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
new WithArrayFieldOfTypeKnownUpfront(new FinalClass[]{new FinalClass((byte) 123)})
);
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
// beginning of array of FinalClass
skipOneByteEmptyNullBitMask(dis);
@@ -188,7 +187,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
new WithArrayFieldOfTypeNotKnownUpfront(new NonFinalClass[]{new NonFinalClass((byte) 123)})
);
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
// beginning of array of NonFinalClass
assertThat(ProtocolMarshalling.readDescriptorOrCommandId(dis), greaterThanOrEqualTo(MIN_CUSTOM_DESCRIPTOR_ID));
@@ -208,7 +207,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesNullsBitmapAndNoDescriptorIdForNonNullValueOfSimpleEnumField() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new WithSimpleEnumField(SimpleEnum.FIRST));
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
skipOneByteEmptyNullBitMask(dis);
assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -220,7 +219,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesNullsBitmapAndNoDescriptorIdForNonNullValueOfEnumWithAnonClassForMemberField() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new WithEnumWithAnonClassesForMembersField(EnumWithAnonClassesForMembers.FIRST));
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
skipOneByteEmptyNullBitMask(dis);
assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
@@ -232,7 +231,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesNullsBitmapAndNoDescriptorIdForNonNullSimpleEnumArrayElement() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new SimpleEnum[]{SimpleEnum.FIRST});
- DataInputStream dis = openDataStream(marshalled);
+ IgniteDataInput dis = openDataStream(marshalled);
assertThat(ProtocolMarshalling.readDescriptorOrCommandId(dis), greaterThanOrEqualTo(MIN_CUSTOM_DESCRIPTOR_ID));
ProtocolMarshalling.readObjectId(dis);
@@ -249,7 +248,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesNullsBitmapAndNoDescriptorIdForNonNullEnumWithAnonClassForMemberArrayElement() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new EnumWithAnonClassesForMembers[]{EnumWithAnonClassesForMembers.FIRST});
- DataInputStream dis = openDataStream(marshalled);
+ IgniteDataInput dis = openDataStream(marshalled);
assertThat(ProtocolMarshalling.readDescriptorOrCommandId(dis), greaterThanOrEqualTo(MIN_CUSTOM_DESCRIPTOR_ID));
ProtocolMarshalling.readObjectId(dis);
@@ -266,7 +265,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void writesFullDescriptorIdsForEnumsInAbstractEnumArray() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new Enum[]{SimpleEnum.FIRST});
- DataInputStream dis = openDataStream(marshalled);
+ IgniteDataInput dis = openDataStream(marshalled);
assertThat(ProtocolMarshalling.readDescriptorOrCommandId(dis), greaterThanOrEqualTo(MIN_CUSTOM_DESCRIPTOR_ID));
ProtocolMarshalling.readObjectId(dis);
@@ -283,7 +282,7 @@ class DefaultUserObjectMarshallerConcreteTypesKnownUpfrontOptimizationTest {
void supportsFinalClassOptimizationInPutFields() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new WithPutFieldsReadFields());
- DataInputStream dis = openDataStreamAndSkipRootObjectHeader(marshalled);
+ IgniteDataInput dis = openDataStreamAndSkipRootObjectHeader(marshalled);
skipOneByteEmptyNullBitMask(dis);
assertThat(ProtocolMarshalling.readObjectId(dis), is(SECOND_USER_OBJECT_ID));
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithArbitraryObjectsTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithArbitraryObjectsTest.java
index 29e094f..26a9852 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithArbitraryObjectsTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithArbitraryObjectsTest.java
@@ -94,9 +94,9 @@ class DefaultUserObjectMarshallerWithArbitraryObjectsTest {
void marshalsArbitraryObjectsUsingDescriptorsOfThemAndTheirContents() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new Simple(42));
- assertThat(marshalled.usedDescriptors(), equalTo(Set.of(
- descriptorRegistry.getRequiredDescriptor(Simple.class),
- descriptorRegistry.getBuiltInDescriptor(BuiltInType.INT)
+ assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(
+ descriptorRegistry.getRequiredDescriptor(Simple.class).descriptorId(),
+ descriptorRegistry.getBuiltInDescriptor(BuiltInType.INT).descriptorId()
)));
}
@@ -132,9 +132,9 @@ class DefaultUserObjectMarshallerWithArbitraryObjectsTest {
void usesDescriptorsOfAllAncestors() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new Child("answer", 42));
- assertThat(marshalled.usedDescriptors(), hasItems(
- descriptorRegistry.getRequiredDescriptor(Parent.class),
- descriptorRegistry.getRequiredDescriptor(Child.class)
+ assertThat(marshalled.usedDescriptorIds(), hasItems(
+ descriptorRegistry.getRequiredDescriptor(Parent.class).descriptorId(),
+ descriptorRegistry.getRequiredDescriptor(Child.class).descriptorId()
));
}
@@ -532,6 +532,27 @@ class DefaultUserObjectMarshallerWithArbitraryObjectsTest {
return Thread.currentThread().getContextClassLoader();
}
+ @Test
+ void asciiInStringFieldIsSupported() throws Exception {
+ WithStringField unmarshalled = marshalAndUnmarshalNonNull(new WithStringField("a"));
+
+ assertThat(unmarshalled.value, is("a"));
+ }
+
+ @Test
+ void nonAsciiLatin1InStringFieldIsSupported() throws Exception {
+ WithStringField unmarshalled = marshalAndUnmarshalNonNull(new WithStringField("é"));
+
+ assertThat(unmarshalled.value, is("é"));
+ }
+
+ @Test
+ void nonLatin1InStringFieldIsSupported() throws Exception {
+ WithStringField unmarshalled = marshalAndUnmarshalNonNull(new WithStringField("щ"));
+
+ assertThat(unmarshalled.value, is("щ"));
+ }
+
private static boolean noArgs(Method method) {
return method.getParameterTypes().length == 0;
}
@@ -758,4 +779,12 @@ class DefaultUserObjectMarshallerWithArbitraryObjectsTest {
throw new RuntimeException("Don't know how to handle " + method + " with args" + Arrays.toString(args));
}
}
+
+ private static class WithStringField {
+ private final String value;
+
+ private WithStringField(String value) {
+ this.value = value;
+ }
+ }
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithBuiltinsTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithBuiltinsTest.java
index 0ea20fb..1665b8c 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithBuiltinsTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithBuiltinsTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import java.io.ByteArrayInputStream;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.math.BigDecimal;
@@ -53,6 +54,8 @@ import org.apache.ignite.internal.network.serialization.BuiltInType;
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
import org.apache.ignite.lang.IgniteUuid;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -89,7 +92,8 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
void marshalsBareObjectUsingOnlyBareObjectDescriptor() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new Object());
- assertThat(marshalled.usedDescriptors(), equalTo(Set.of(descriptorRegistry.getRequiredDescriptor(Object.class))));
+ ClassDescriptor expectedDescriptor = descriptorRegistry.getRequiredDescriptor(Object.class);
+ assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(expectedDescriptor.descriptorId())));
}
@Test
@@ -109,10 +113,10 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
void marshalsObjectArrayUsingExactlyDescriptorsOfObjectArrayAndComponents() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new Object[]{42, "abc"});
- assertThat(marshalled.usedDescriptors(), containsInAnyOrder(
- descriptorRegistry.getRequiredDescriptor(Object[].class),
- descriptorRegistry.getRequiredDescriptor(Integer.class),
- descriptorRegistry.getRequiredDescriptor(String.class)
+ assertThat(marshalled.usedDescriptorIds(), containsInAnyOrder(
+ descriptorRegistry.getRequiredDescriptor(Object[].class).descriptorId(),
+ descriptorRegistry.getRequiredDescriptor(Integer.class).descriptorId(),
+ BuiltInType.STRING_LATIN1.descriptorId()
));
}
@@ -134,16 +138,16 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
void marshalsSimpleEnumsUsingOnlyEnumClassDescriptor() throws Exception {
MarshalledObject marshalled = marshaller.marshal(SimpleEnum.FIRST);
- assertThat(marshalled.usedDescriptors(), equalTo(Set.of(descriptorRegistry.getRequiredDescriptor(SimpleEnum.class))));
+ ClassDescriptor expectedDescriptor = descriptorRegistry.getRequiredDescriptor(SimpleEnum.class);
+ assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(expectedDescriptor.descriptorId())));
}
@Test
void marshalsEnumsWithAnonClassesForMembersUsingOnlyEnumClassDescriptor() throws Exception {
MarshalledObject marshalled = marshaller.marshal(EnumWithAnonClassesForMembers.FIRST);
- assertThat(marshalled.usedDescriptors(),
- equalTo(Set.of(descriptorRegistry.getRequiredDescriptor(EnumWithAnonClassesForMembers.class)))
- );
+ ClassDescriptor expectedDescriptor = descriptorRegistry.getRequiredDescriptor(EnumWithAnonClassesForMembers.class);
+ assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(expectedDescriptor.descriptorId())));
}
@Test
@@ -166,7 +170,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
void marshalsSimpleEnumInCorrectFormat() throws Exception {
MarshalledObject marshalled = marshaller.marshal(SimpleEnum.FIRST);
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+ IgniteDataInput dis = openDataInput(marshalled);
int descriptorId = ProtocolMarshalling.readDescriptorOrCommandId(dis);
assertThat(descriptorRegistry.getRequiredDescriptor(descriptorId).clazz(), is(SimpleEnum.class));
@@ -174,13 +178,14 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
ProtocolMarshalling.readObjectId(dis);
assertThat(dis.readUTF(), is("FIRST"));
+ assertThat(dis.available(), is(0));
}
@Test
void marshalsEnumWithAnonClassesForMembersInCorrectFormat() throws Exception {
MarshalledObject marshalled = marshaller.marshal(EnumWithAnonClassesForMembers.FIRST);
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+ IgniteDataInput dis = openDataInput(marshalled);
int descriptorId = ProtocolMarshalling.readDescriptorOrCommandId(dis);
assertThat(descriptorRegistry.getRequiredDescriptor(descriptorId).clazz(), is(EnumWithAnonClassesForMembers.class));
@@ -188,6 +193,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
ProtocolMarshalling.readObjectId(dis);
assertThat(dis.readUTF(), is("FIRST"));
+ assertThat(dis.available(), is(0));
}
@ParameterizedTest
@@ -210,7 +216,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
MarshalledObject marshalled = marshaller.marshal(typeValue.value);
ClassDescriptor expectedDescriptor = descriptorRegistry.getBuiltInDescriptor(typeValue.builtinType);
- assertThat(marshalled.usedDescriptors(), equalTo(Set.of(expectedDescriptor)));
+ assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(expectedDescriptor.descriptorId())));
}
static Stream<Arguments> builtInNonCollectionTypes() {
@@ -224,7 +230,8 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
builtInTypeValue(true, BuiltInType.BOOLEAN_BOXED),
builtInTypeValue('a', BuiltInType.CHAR_BOXED),
// BARE_OBJECT is handled separately
- builtInTypeValue("abc", BuiltInType.STRING),
+ builtInTypeValue("abc", BuiltInType.STRING_LATIN1),
+ builtInTypeValue("Привет", BuiltInType.STRING),
builtInTypeValue(UUID.fromString("c6f57d4a-619f-11ec-add6-73bc97c3c49e"), BuiltInType.UUID),
builtInTypeValue(IgniteUuid.fromString("1234-c6f57d4a-619f-11ec-add6-73bc97c3c49e"),
BuiltInType.IGNITE_UUID),
@@ -235,7 +242,8 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
builtInTypeValue(new float[]{1.0f, 2.0f, 3.0f}, BuiltInType.FLOAT_ARRAY),
builtInTypeValue(new long[]{1, 2, 3}, BuiltInType.LONG_ARRAY),
builtInTypeValue(new double[]{1.0, 2.0, 3.0}, BuiltInType.DOUBLE_ARRAY),
- builtInTypeValue(new boolean[]{true, false}, BuiltInType.BOOLEAN_ARRAY),
+ builtInTypeValue(new boolean[]{true, false, true}, BuiltInType.BOOLEAN_ARRAY),
+ builtInTypeValue(new boolean[]{true, false, true, false, true, false, true, false, true}, BuiltInType.BOOLEAN_ARRAY),
builtInTypeValue(new char[]{'a', 'b'}, BuiltInType.CHAR_ARRAY),
builtInTypeValue(new BigDecimal(42), BuiltInType.DECIMAL),
builtInTypeValue(BitSet.valueOf(new long[]{42, 43}), BuiltInType.BIT_SET),
@@ -269,9 +277,9 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
void marshalsUsingOnlyCorrespondingDescriptorsForBuiltInCollectionTypes(BuiltInTypeValue typeValue) throws Exception {
MarshalledObject marshalled = marshaller.marshal(typeValue.value);
- assertThat(marshalled.usedDescriptors(), containsInAnyOrder(
- descriptorRegistry.getBuiltInDescriptor(typeValue.builtinType),
- descriptorRegistry.getBuiltInDescriptor(BuiltInType.INT_BOXED)
+ assertThat(marshalled.usedDescriptorIds(), containsInAnyOrder(
+ typeValue.builtinType.descriptorId(),
+ BuiltInType.INT_BOXED.descriptorId()
));
}
@@ -322,7 +330,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
ClassDescriptor descriptor = descriptorRegistry.getRequiredDescriptor(array.getClass());
- assertThat(marshalled.usedDescriptors(), hasItem(descriptor));
+ assertThat(marshalled.usedDescriptorIds(), hasItem(descriptor.descriptorId()));
}
@ParameterizedTest
@@ -410,7 +418,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
void marshalsSimpleEnumArrayCorrectly() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new SimpleEnum[]{SimpleEnum.FIRST});
- var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+ IgniteDataInput dis = openDataInput(marshalled);
ProtocolMarshalling.readDescriptorOrCommandId(dis);
ProtocolMarshalling.readObjectId(dis);
@@ -428,7 +436,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
void marshalsEnumArrayWithValuesOfSimpleEnumCorrectly() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new Enum[]{SimpleEnum.FIRST});
- var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+ IgniteDataInput dis = openDataInput(marshalled);
ProtocolMarshalling.readDescriptorOrCommandId(dis);
ProtocolMarshalling.readObjectId(dis);
@@ -446,7 +454,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
void marshalsArrayOfEnumWithAnonClassesForMembersCorrectly() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new EnumWithAnonClassesForMembers[]{EnumWithAnonClassesForMembers.FIRST});
- var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+ IgniteDataInput dis = openDataInput(marshalled);
ProtocolMarshalling.readDescriptorOrCommandId(dis);
ProtocolMarshalling.readObjectId(dis);
@@ -460,7 +468,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
assertThat(dis.available(), is(0));
}
- private void skipOneByteEmptyNullBitMask(DataInputStream dis) throws IOException {
+ private void skipOneByteEmptyNullBitMask(DataInput dis) throws IOException {
assertThat(dis.readByte(), is((byte) 0));
}
@@ -468,7 +476,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
void marshalsEnumArrayWithValuesOfEnumWithAnonClassesForMembersValuesCorrectly() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new Enum[]{EnumWithAnonClassesForMembers.FIRST});
- var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+ IgniteDataInput dis = openDataInput(marshalled);
ProtocolMarshalling.readDescriptorOrCommandId(dis);
ProtocolMarshalling.readObjectId(dis);
@@ -486,7 +494,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
void marshalsEmptyAbstractEnumArrayCorrectly() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new Enum[]{});
- var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+ IgniteDataInput dis = openDataInput(marshalled);
ProtocolMarshalling.readDescriptorOrCommandId(dis);
ProtocolMarshalling.readObjectId(dis);
@@ -497,11 +505,15 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
assertThat(dis.available(), is(0));
}
+ private IgniteDataInput openDataInput(MarshalledObject marshalled) {
+ return new IgniteUnsafeDataInput(marshalled.bytes());
+ }
+
@Test
void marshalsEmptySimpleEnumArrayCorrectly() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new SimpleEnum[]{});
- var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+ IgniteDataInput dis = openDataInput(marshalled);
ProtocolMarshalling.readDescriptorOrCommandId(dis);
ProtocolMarshalling.readObjectId(dis);
@@ -516,7 +528,7 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
void marshalsEmptyArrayOfEnumWithAnonClassesForMembersCorrectly() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new EnumWithAnonClassesForMembers[]{});
- var dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+ IgniteDataInput dis = openDataInput(marshalled);
ProtocolMarshalling.readDescriptorOrCommandId(dis);
ProtocolMarshalling.readObjectId(dis);
@@ -527,6 +539,22 @@ class DefaultUserObjectMarshallerWithBuiltinsTest {
assertThat(dis.available(), is(0));
}
+ @ParameterizedTest
+ @MethodSource("testStrings")
+ void marshalsAndUnmarshalsStrings(String str) throws Exception {
+ String unmarshalled = marshalAndUnmarshal(str);
+
+ assertThat(unmarshalled, is(equalTo(str)));
+ }
+
+ private static Stream<Arguments> testStrings() {
+ return Stream.of(
+ "ASCII",
+ "Not ASCII, but Latin-1: é",
+ "Ютиэф-8"
+ ).map(Arguments::of);
+ }
+
private static class BuiltInTypeValue {
private final Object value;
private final BuiltInType builtinType;
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithExternalizableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithExternalizableTest.java
index e7f33a0..e088eb1 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithExternalizableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithExternalizableTest.java
@@ -62,9 +62,8 @@ class DefaultUserObjectMarshallerWithExternalizableTest {
void usesExactlyOneDescriptorWhenMarshallingExternalizable() throws Exception {
MarshalledObject marshalled = marshaller.marshal(new SimpleExternalizable(42));
- ClassDescriptor expectedDescriptor = descriptorRegistry.getDescriptor(SimpleExternalizable.class);
- assertThat(expectedDescriptor, is(notNullValue()));
- assertThat(marshalled.usedDescriptors(), is(equalTo(Set.of(expectedDescriptor))));
+ ClassDescriptor expectedDescriptor = descriptorRegistry.getRequiredDescriptor(SimpleExternalizable.class);
+ assertThat(marshalled.usedDescriptorIds(), is(equalTo(Set.of(expectedDescriptor.descriptorId()))));
}
@Test
@@ -113,7 +112,7 @@ class DefaultUserObjectMarshallerWithExternalizableTest {
MarshalledObject marshalled = marshaller.marshal(new ExternalizableWithReplaceWithSimple(42));
ClassDescriptor replacementDescriptor = descriptorRegistry.getRequiredDescriptor(SimpleExternalizable.class);
- assertThat(marshalled.usedDescriptors(), equalTo(Set.of(replacementDescriptor)));
+ assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(replacementDescriptor.descriptorId())));
}
@Test
@@ -130,7 +129,7 @@ class DefaultUserObjectMarshallerWithExternalizableTest {
MarshalledObject marshalled = marshaller.marshal(new ExternalizableWithReplaceWithNull(42));
ClassDescriptor replacementDescriptor = descriptorRegistry.getNullDescriptor();
- assertThat(marshalled.usedDescriptors(), equalTo(Set.of(replacementDescriptor)));
+ assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(replacementDescriptor.descriptorId())));
}
@Test
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest.java
index 916edaf..edd3c8a 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest.java
@@ -34,10 +34,8 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidObjectException;
@@ -50,6 +48,8 @@ import java.util.Objects;
import java.util.stream.Stream;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -124,7 +124,7 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
}
private byte[] readOverrideBytes(MarshalledObject marshalled) throws IOException {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(marshalled.bytes()));
+ IgniteDataInput dis = new IgniteUnsafeDataInput(marshalled.bytes());
ProtocolMarshalling.readDescriptorOrCommandId(dis);
ProtocolMarshalling.readObjectId(dis);
@@ -143,7 +143,7 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
new ReadWriteSpec<>("double", oos -> oos.writeDouble(42.0), DataInput::readDouble, 42.0),
new ReadWriteSpec<>("char", oos -> oos.writeChar('a'), DataInput::readChar, 'a'),
new ReadWriteSpec<>("boolean", oos -> oos.writeBoolean(true), DataInput::readBoolean, true),
- new ReadWriteSpec<>("stream byte", oos -> oos.write(42), ObjectInputStream::read, DataInputStream::read, 42),
+ new ReadWriteSpec<>("stream byte", oos -> oos.write(42), ObjectInputStream::read, IgniteDataInput::read, 42),
new ReadWriteSpec<>(
"byte array",
oos -> oos.write(new byte[]{42, 43}),
@@ -197,7 +197,7 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
"readAllBytes",
oos -> oos.write(new byte[]{42, 43}),
InputStream::readAllBytes,
- InputStream::readAllBytes,
+ IgniteDataInput::readAllBytes,
new byte[]{42, 43}
),
new ReadWriteSpec<>(
@@ -226,6 +226,14 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
}
@SuppressWarnings("SameParameterValue")
+ private static byte[] readBytes(IgniteDataInput is, int count) throws IOException {
+ byte[] bytes = new byte[count];
+ int read = is.read(bytes);
+ assertThat(read, is(count));
+ return bytes;
+ }
+
+ @SuppressWarnings("SameParameterValue")
private static byte[] readRange(InputStream is, int count) throws IOException {
byte[] bytes = new byte[count];
int read = is.read(bytes, 0, count);
@@ -233,6 +241,14 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
return bytes;
}
+ @SuppressWarnings("SameParameterValue")
+ private static byte[] readRange(IgniteDataInput is, int count) throws IOException {
+ byte[] bytes = new byte[count];
+ int read = is.read(bytes, 0, count);
+ assertThat(read, is(count));
+ return bytes;
+ }
+
private static byte[] readBytesFully(DataInput is, int count) throws IOException {
byte[] bytes = new byte[count];
is.readFully(bytes);
@@ -253,9 +269,16 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
return bytes;
}
- private static <T> T consumeAndUnmarshal(DataInputStream stream) throws IOException {
+ @SuppressWarnings("SameParameterValue")
+ private static byte[] readNumBytesRange(IgniteDataInput is, int count) throws IOException {
+ byte[] bytes = new byte[count];
+ is.readFewBytes(bytes, 0, count);
+ return bytes;
+ }
+
+ private static <T> T consumeAndUnmarshal(IgniteDataInput stream) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- stream.transferTo(baos);
+ baos.write(stream.readAllBytes());
try {
return staticMarshaller.unmarshal(baos.toByteArray(), staticDescriptorRegistry);
@@ -585,13 +608,13 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
MarshalledObject marshalled = marshaller.marshal(original);
byte[] overrideBytes = readOverrideBytes(marshalled);
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(overrideBytes));
+ var dis = new IgniteUnsafeDataInput(overrideBytes);
assertThat(dis.readInt(), is(42));
assertThatDrained(dis);
}
- private void assertThatDrained(DataInputStream dis) throws IOException {
+ private void assertThatDrained(InputStream dis) throws IOException {
assertThat("Stream is not drained", dis.read(), is(lessThan(0)));
}
@@ -630,7 +653,7 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
}
private interface DataReader<T> {
- T readFrom(DataInputStream stream) throws IOException;
+ T readFrom(IgniteDataInput stream) throws IOException;
}
private interface InputReader<T> {
@@ -667,7 +690,7 @@ class DefaultUserObjectMarshallerWithSerializableOverrideStreamsTest {
}
private T parseOverrideValue(byte[] overrideBytes) throws IOException {
- DataInputStream dis = new DataInputStream(new ByteArrayInputStream(overrideBytes));
+ IgniteDataInput dis = new IgniteUnsafeDataInput(overrideBytes);
return dataReader.readFrom(dis);
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableTest.java
index 1541413..072b178 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/marshal/DefaultUserObjectMarshallerWithSerializableTest.java
@@ -111,10 +111,10 @@ class DefaultUserObjectMarshallerWithSerializableTest {
MarshalledObject marshalled = marshaller.marshal(new SerializableWithReplaceWithSimple(42));
ClassDescriptor originalDescriptor = descriptorRegistry.getRequiredDescriptor(SerializableWithReplaceWithSimple.class);
- assertThat(marshalled.usedDescriptors(), not(hasItem(originalDescriptor)));
+ assertThat(marshalled.usedDescriptorIds(), not(hasItem(originalDescriptor.descriptorId())));
ClassDescriptor replacementDescriptor = descriptorRegistry.getRequiredDescriptor(SimpleSerializable.class);
- assertThat(marshalled.usedDescriptors(), hasItem(replacementDescriptor));
+ assertThat(marshalled.usedDescriptorIds(), hasItem(replacementDescriptor.descriptorId()));
}
@Test
@@ -131,7 +131,7 @@ class DefaultUserObjectMarshallerWithSerializableTest {
MarshalledObject marshalled = marshaller.marshal(new SerializableWithReplaceWithNull(42));
ClassDescriptor replacementDescriptor = descriptorRegistry.getNullDescriptor();
- assertThat(marshalled.usedDescriptors(), equalTo(Set.of(replacementDescriptor)));
+ assertThat(marshalled.usedDescriptorIds(), equalTo(Set.of(replacementDescriptor.descriptorId())));
}
@Test
diff --git a/parent/pom.xml b/parent/pom.xml
index 0d90766..2c2a1f9 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -86,6 +86,7 @@
<msgpack.version>0.8.21</msgpack.version>
<caffeine.version>3.0.4</caffeine.version>
<fastutil.version>8.5.6</fastutil.version>
+ <kryo.version>4.0.1</kryo.version>
<!-- Plugins versions -->
<apache.rat.plugin.version>0.13</apache.rat.plugin.version>
@@ -593,6 +594,13 @@
<version>${jmh.framework.version}</version>
</dependency>
+ <!-- We currently only use Kryo for benchmarking our User Object Serialization -->
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo</artifactId>
+ <version>${kryo.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>