You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/04/09 09:27:02 UTC
[ignite-3] branch main updated: IGNITE-14084 Direct marshalling
code converted from 2.x codebase (#82).
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 49bfd0f IGNITE-14084 Direct marshalling code converted from 2.x codebase (#82).
49bfd0f is described below
commit 49bfd0fc970f633615ae7e96c958650287ff7f85
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Fri Apr 9 12:26:09 2021 +0300
IGNITE-14084 Direct marshalling code converted from 2.x codebase (#82).
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
modules/core/pom.xml | 5 +
.../ignite/internal/util/DirectBufferCleaner.java} | 15 +-
.../ignite/internal/util/FeatureChecker.java | 40 +
.../apache/ignite/internal/util/GridUnsafe.java | 2013 ++++++++++++++++++++
.../apache/ignite/internal/util/IgniteUtils.java | 116 ++
.../util/ReflectiveDirectBufferCleaner.java | 65 +
.../internal/util/UnsafeDirectBufferCleaner.java} | 34 +-
.../communication/MessageCollectionItemType.java | 102 +
modules/network/pom.xml | 6 +
.../ignite/network/DirectSerializationTest.java | 120 ++
.../ITScaleCubeNetworkClusterMessagingTest.java | 10 +-
.../ignite/network/scalecube/TestMessage.java | 38 +-
.../scalecube/TestMessageMapperProvider.java | 54 -
.../scalecube/TestMessageSerializerProvider.java | 117 ++
...der.java => TestRequestSerializerProvider.java} | 30 +-
...er.java => TestResponseSerializerProvider.java} | 30 +-
.../java/org/apache/ignite/network/Network.java | 12 +-
.../ignite/network/NetworkClusterContext.java | 14 +-
.../ignite/network/internal/MessageReader.java | 287 ++-
.../network/internal/MessageSerializerFactory.java | 55 +
.../ignite/network/internal/MessageWriter.java | 315 ++-
.../internal/direct/DirectMessageReader.java | 410 ++++
.../internal/direct/DirectMessageWriter.java | 373 ++++
.../internal/direct/state/DirectMessageState.java | 96 +
.../direct/state/DirectMessageStateItem.java} | 11 +-
.../direct/stream/DirectByteBufferStream.java | 319 ++++
.../stream/DirectByteBufferStreamImplV1.java | 1805 ++++++++++++++++++
.../message/DefaultMessageMapperProvider.java | 54 -
.../network/message/MessageDeserializer.java | 10 +-
.../ignite/network/message/MessageSerializer.java | 5 +-
...rovider.java => MessageSerializerProvider.java} | 7 +-
.../ignite/network/message/NetworkMessage.java | 3 +
.../network/scalecube/ScaleCubeMessageCodec.java | 133 --
.../network/scalecube/ScaleCubeMessageWriter.java | 38 -
.../network/scalecube/ScaleCubeNetworkCluster.java | 11 +-
.../scalecube/ScaleCubeNetworkClusterFactory.java | 3 +-
.../raft/server/ITRaftCounterServerTest.java | 12 +-
.../ignite/raft/server/impl/RaftServerImpl.java | 12 +-
38 files changed, 6369 insertions(+), 411 deletions(-)
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index c311ec1..6b1d43b 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -35,6 +35,11 @@
<version>3.0.0-SNAPSHOT</version>
<dependencies>
+ <dependency>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
<!-- Test dependencies. -->
<dependency>
<groupId>org.junit.jupiter</groupId>
diff --git a/modules/network/src/main/java/org/apache/ignite/network/message/NetworkMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/util/DirectBufferCleaner.java
similarity index 75%
copy from modules/network/src/main/java/org/apache/ignite/network/message/NetworkMessage.java
copy to modules/core/src/main/java/org/apache/ignite/internal/util/DirectBufferCleaner.java
index 50b4975..4264791 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/message/NetworkMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/DirectBufferCleaner.java
@@ -14,14 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.network.message;
+
+package org.apache.ignite.internal.util;
+
+import java.nio.ByteBuffer;
/**
- * Message for exchange information in cluster.
+ * Cleaner interface for {@code java.nio.ByteBuffer}.
*/
-public interface NetworkMessage {
+public interface DirectBufferCleaner {
/**
- * @return Message type.
+ * Cleans the direct buffer.
+ *
+ * @param buf direct buffer.
*/
- public abstract short directType();
+ public void clean(ByteBuffer buf);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/FeatureChecker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/FeatureChecker.java
new file mode 100644
index 0000000..2aa2141
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/FeatureChecker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * Class extracted for fields from GridUnsafe to be absolutely independent with current and future static block
+ * initialization effects.
+ */
+public class FeatureChecker {
+ /** Required options to run on Java 9, 10, 11. */
+ public static final String JAVA_9_10_11_OPTIONS = "--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED\n" +
+ "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED\n" +
+ "--add-exports=java.management/com.sun.jmx.mbeanserver=ALL-UNNAMED\n" +
+ "--add-exports=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED\n" +
+ "--add-exports=java.base/sun.reflect.generics.reflectiveObjects=ALL-UNNAMED\n" +
+ "--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED\n" +
+ "--illegal-access=permit";
+
+ /** Java version specific warning to be added in case access failed */
+ public static final String JAVA_VER_SPECIFIC_WARN =
+ "\nPlease add the following parameters to JVM startup settings and restart the application: {parameters: " +
+ JAVA_9_10_11_OPTIONS +
+ "\n}" +
+ "\nSee https://apacheignite.readme.io/docs/getting-started#section-running-ignite-with-java-9-10-11 for more information.";
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
new file mode 100644
index 0000000..2dbd4fd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -0,0 +1,2013 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import sun.misc.Unsafe;
+
+import static org.apache.ignite.internal.util.IgniteUtils.jdkVersion;
+import static org.apache.ignite.internal.util.IgniteUtils.majorJavaVersion;
+
+/**
+ * <p>Wrapper for the {@link sun.misc.Unsafe} class.</p>
+ *
+ * <p>
+ * All memory access operations have the following properties:
+ * <ul>
+ * <li>All {@code putXxx(long addr, xxx val)}, {@code getXxx(long addr)}, {@code putXxx(byte[] arr, long off, xxx val)},
+ * {@code getXxx(byte[] arr, long off)} and corresponding methods with {@code LE} suffix are alignment aware
+ * and can be safely used with unaligned pointers.</li>
+ * <li>All {@code putXxxField(Object obj, long fieldOff, xxx val)} and {@code getXxxField(Object obj, long fieldOff)}
+ * methods are not alignment aware and can't be safely used with unaligned pointers. This methods can be safely used
+ * for object field values access because all object fields addresses are aligned.</li>
+ * <li>All {@code putXxxLE(...)} and {@code getXxxLE(...)} methods assumes that the byte order is fixed as little-endian
+ * while native byte order is big-endian. So it is client code responsibility to check native byte order before
+ * invoking these methods.</li>
+ * </ul>
+ * </p>
+ */
+public abstract class GridUnsafe {
+ /** */
+ public static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder();
+
+ /** Unsafe. */
+ private static final Unsafe UNSAFE = unsafe();
+
+ /** Unaligned flag. */
+ private static final boolean UNALIGNED = unaligned();
+
+ /** Per-byte copy threshold. */
+ private static final long PER_BYTE_THRESHOLD = 0L;
+
+ /** Big endian. */
+ public static final boolean BIG_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
+
+ /** Address size. */
+ public static final int ADDR_SIZE = UNSAFE.addressSize();
+
+ /** */
+ public static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** */
+ public static final int BYTE_ARR_INT_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** */
+ public static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
+
+ /** */
+ public static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
+
+ /** */
+ public static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
+
+ /** */
+ public static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
+
+ /** */
+ public static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
+
+ /** */
+ public static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
+
+ /** */
+ public static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
+
+ /** {@link java.nio.Buffer#address} field offset. */
+ private static final long DIRECT_BUF_ADDR_OFF = bufferAddressOffset();
+
+ /** Cleaner code for direct {@code java.nio.ByteBuffer}. */
+ private static final DirectBufferCleaner DIRECT_BUF_CLEANER =
+ majorJavaVersion(jdkVersion()) < 9
+ ? new ReflectiveDirectBufferCleaner()
+ : new UnsafeDirectBufferCleaner();
+
+ /** JavaNioAccess object. If {@code null} then {@link #NEW_DIRECT_BUF_CONSTRUCTOR} should be available. */
+ @Nullable private static final Object JAVA_NIO_ACCESS_OBJ;
+
+ /**
+ * JavaNioAccess#newDirectByteBuffer method. Ususally {@code null} if {@link #JAVA_NIO_ACCESS_OBJ} is {@code null}.
+ * If {@code null} then {@link #NEW_DIRECT_BUF_CONSTRUCTOR} should be available.
+ */
+ @Nullable private static final Method NEW_DIRECT_BUF_MTD;
+
+ /**
+ * New direct buffer class constructor obtained and tested using reflection. If {@code null} then both {@link
+ * #JAVA_NIO_ACCESS_OBJ} and {@link #NEW_DIRECT_BUF_MTD} should be not {@code null}.
+ */
+ @Nullable private static final Constructor<?> NEW_DIRECT_BUF_CONSTRUCTOR;
+
+ static {
+ Object nioAccessObj = null;
+ Method directBufMtd = null;
+
+ Constructor<?> directBufCtor = null;
+
+ if (majorJavaVersion(jdkVersion()) < 12) {
+ // for old java prefer Java NIO & Shared Secrets obect init way
+ try {
+ nioAccessObj = javaNioAccessObject();
+ directBufMtd = newDirectBufferMethod(nioAccessObj);
+ }
+ catch (Exception e) {
+ nioAccessObj = null;
+ directBufMtd = null;
+
+ try {
+ directBufCtor = createAndTestNewDirectBufferCtor();
+ }
+ catch (Exception eFallback) {
+ //noinspection CallToPrintStackTrace
+ eFallback.printStackTrace();
+
+ e.addSuppressed(eFallback);
+
+ throw e; // fallback was not suceefull
+ }
+
+ if (directBufCtor == null)
+ throw e;
+ }
+ }
+ else {
+ try {
+ directBufCtor = createAndTestNewDirectBufferCtor();
+ }
+ catch (Exception e) {
+ try {
+ nioAccessObj = javaNioAccessObject();
+ directBufMtd = newDirectBufferMethod(nioAccessObj);
+ }
+ catch (Exception eFallback) {
+ //noinspection CallToPrintStackTrace
+ eFallback.printStackTrace();
+
+ e.addSuppressed(eFallback);
+
+ throw e; //fallback to shared secrets failed.
+ }
+
+ if (nioAccessObj == null || directBufMtd == null)
+ throw e;
+ }
+ }
+
+ JAVA_NIO_ACCESS_OBJ = nioAccessObj;
+ NEW_DIRECT_BUF_MTD = directBufMtd;
+
+ NEW_DIRECT_BUF_CONSTRUCTOR = directBufCtor;
+ }
+
+ /**
+ * Ensure singleton.
+ */
+ private GridUnsafe() {
+ // No-op.
+ }
+
+ /**
+ * Wraps a pointer to unmanaged memory into a direct byte buffer.
+ *
+ * @param ptr Pointer to wrap.
+ * @param len Memory location length.
+ * @return Byte buffer wrapping the given memory.
+ */
+ public static ByteBuffer wrapPointer(long ptr, int len) {
+ if (NEW_DIRECT_BUF_MTD != null && JAVA_NIO_ACCESS_OBJ != null)
+ return wrapPointerJavaNio(ptr, len, NEW_DIRECT_BUF_MTD, JAVA_NIO_ACCESS_OBJ);
+ else if (NEW_DIRECT_BUF_CONSTRUCTOR != null)
+ return wrapPointerDirectBufferConstructor(ptr, len, NEW_DIRECT_BUF_CONSTRUCTOR);
+ else
+ throw new RuntimeException("All alternatives for a new DirectByteBuffer() creation failed: " + FeatureChecker.JAVA_VER_SPECIFIC_WARN);
+ }
+
+ /**
+ * Wraps a pointer to unmanaged memory with a direct byte buffer using the direct byte buffer's constructor.
+ *
+ * @param ptr Pointer to wrap.
+ * @param len Memory location length.
+ * @param constructor Constructor to use. Should create an instance of a direct ByteBuffer.
+ * @return Byte buffer wrapping the given memory.
+ */
+ @NotNull private static ByteBuffer wrapPointerDirectBufferConstructor(long ptr, int len, Constructor<?> constructor) {
+ try {
+ Object newDirectBuf = constructor.newInstance(ptr, len);
+
+ return ((ByteBuffer)newDirectBuf).order(NATIVE_BYTE_ORDER);
+ }
+ catch (ReflectiveOperationException e) {
+ throw new RuntimeException("DirectByteBuffer#constructor is unavailable."
+ + FeatureChecker.JAVA_VER_SPECIFIC_WARN, e);
+ }
+ }
+
+ /**
+ * Wraps a pointer to unmanaged memory with a direct byte buffer using a JavaNioAccess object.
+ *
+ * @param ptr Pointer to wrap.
+ * @param len Memory location length.
+ * @param newDirectBufMtd Method which should return an instance of a direct byte buffer.
+ * @param javaNioAccessObj Object to invoke method.
+ * @return Byte buffer wrapping the given memory.
+ */
+ @NotNull private static ByteBuffer wrapPointerJavaNio(long ptr,
+ int len,
+ @NotNull Method newDirectBufMtd,
+ @NotNull Object javaNioAccessObj) {
+ try {
+ ByteBuffer buf = (ByteBuffer)newDirectBufMtd.invoke(javaNioAccessObj, ptr, len, null);
+
+ assert buf.isDirect();
+
+ buf.order(NATIVE_BYTE_ORDER);
+
+ return buf;
+ }
+ catch (ReflectiveOperationException e) {
+ throw new RuntimeException("JavaNioAccess#newDirectByteBuffer() method is unavailable."
+ + FeatureChecker.JAVA_VER_SPECIFIC_WARN, e);
+ }
+ }
+
+ /**
+ * @param len Length.
+ * @return Allocated direct buffer.
+ */
+ public static ByteBuffer allocateBuffer(int len) {
+ long ptr = allocateMemory(len);
+
+ return wrapPointer(ptr, len);
+ }
+
+ /**
+ * @param buf Direct buffer allocated by {@link #allocateBuffer(int)}.
+ */
+ public static void freeBuffer(ByteBuffer buf) {
+ long ptr = bufferAddress(buf);
+
+ freeMemory(ptr);
+ }
+
+ /**
+ *
+ * @param buf Buffer.
+ * @param len New length.
+ * @return Reallocated direct buffer.
+ */
+ public static ByteBuffer reallocateBuffer(ByteBuffer buf, int len) {
+ long ptr = bufferAddress(buf);
+
+ long newPtr = reallocateMemory(ptr, len);
+
+ return wrapPointer(newPtr, len);
+ }
+
+ /**
+ * Gets a boolean value from an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @return Boolean value from object field.
+ */
+ public static boolean getBooleanField(Object obj, long fieldOff) {
+ return UNSAFE.getBoolean(obj, fieldOff);
+ }
+
+ /**
+ * Stores a boolean value into an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @param val Value.
+ */
+ public static void putBooleanField(Object obj, long fieldOff, boolean val) {
+ UNSAFE.putBoolean(obj, fieldOff, val);
+ }
+
+ /**
+ * Gets a byte value from an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @return Byte value from object field.
+ */
+ public static byte getByteField(Object obj, long fieldOff) {
+ return UNSAFE.getByte(obj, fieldOff);
+ }
+
+ /**
+ * Stores a byte value into an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @param val Value.
+ */
+ public static void putByteField(Object obj, long fieldOff, byte val) {
+ UNSAFE.putByte(obj, fieldOff, val);
+ }
+
+ /**
+ * Gets a short value from an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @return Short value from object field.
+ */
+ public static short getShortField(Object obj, long fieldOff) {
+ return UNSAFE.getShort(obj, fieldOff);
+ }
+
+ /**
+ * Stores a short value into an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @param val Value.
+ */
+ public static void putShortField(Object obj, long fieldOff, short val) {
+ UNSAFE.putShort(obj, fieldOff, val);
+ }
+
+ /**
+ * Gets a char value from an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @return Char value from object field.
+ */
+ public static char getCharField(Object obj, long fieldOff) {
+ return UNSAFE.getChar(obj, fieldOff);
+ }
+
+ /**
+ * Stores a char value into an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @param val Value.
+ */
+ public static void putCharField(Object obj, long fieldOff, char val) {
+ UNSAFE.putChar(obj, fieldOff, val);
+ }
+
+ /**
+ * Gets an integer value from an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @return Integer value from object field.
+ */
+ public static int getIntField(Object obj, long fieldOff) {
+ return UNSAFE.getInt(obj, fieldOff);
+ }
+
+ /**
+ * Stores an integer value into an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @param val Value.
+ */
+ public static void putIntField(Object obj, long fieldOff, int val) {
+ UNSAFE.putInt(obj, fieldOff, val);
+ }
+
+ /**
+ * Gets a long value from an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @return Long value from object field.
+ */
+ public static long getLongField(Object obj, long fieldOff) {
+ return UNSAFE.getLong(obj, fieldOff);
+ }
+
+ /**
+ * Stores a long value into an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @param val Value.
+ */
+ public static void putLongField(Object obj, long fieldOff, long val) {
+ UNSAFE.putLong(obj, fieldOff, val);
+ }
+
+ /**
+ * Gets a float value from an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @return Float value from object field.
+ */
+ public static float getFloatField(Object obj, long fieldOff) {
+ return UNSAFE.getFloat(obj, fieldOff);
+ }
+
+ /**
+ * Stores a float value into an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @param val Value.
+ */
+ public static void putFloatField(Object obj, long fieldOff, float val) {
+ UNSAFE.putFloat(obj, fieldOff, val);
+ }
+
+ /**
+ * Gets a double value from an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @return Double value from object field.
+ */
+ public static double getDoubleField(Object obj, long fieldOff) {
+ return UNSAFE.getDouble(obj, fieldOff);
+ }
+
+ /**
+ * Stores a double value into an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @param val Value.
+ */
+ public static void putDoubleField(Object obj, long fieldOff, double val) {
+ UNSAFE.putDouble(obj, fieldOff, val);
+ }
+
+ /**
+ * Gets a reference from an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @return Reference from object field.
+ */
+ public static Object getObjectField(Object obj, long fieldOff) {
+ return UNSAFE.getObject(obj, fieldOff);
+ }
+
+ /**
+ * Stores a reference value into an object field.
+ *
+ * @param obj Object.
+ * @param fieldOff Field offset.
+ * @param val Value.
+ */
+ public static void putObjectField(Object obj, long fieldOff, Object val) {
+ UNSAFE.putObject(obj, fieldOff, val);
+ }
+
+ /**
+ * Gets a boolean value from a byte array.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Boolean value from byte array.
+ */
+ public static boolean getBoolean(byte[] arr, long off) {
+ return UNSAFE.getBoolean(arr, off);
+ }
+
+ /**
+ * Stores a boolean value into a byte array.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putBoolean(byte[] arr, long off, boolean val) {
+ UNSAFE.putBoolean(arr, off, val);
+ }
+
+ /**
+ * Gets a byte value from a byte array.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Byte value from byte array.
+ */
+ public static byte getByte(byte[] arr, long off) {
+ return UNSAFE.getByte(arr, off);
+ }
+
+ /**
+ * Stores a byte value into a byte array.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putByte(byte[] arr, long off, byte val) {
+ UNSAFE.putByte(arr, off, val);
+ }
+
+ /**
+ * Gets a short value from a byte array. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Short value from byte array.
+ */
+ public static short getShort(byte[] arr, long off) {
+ return UNALIGNED ? UNSAFE.getShort(arr, off) : getShortByByte(arr, off, BIG_ENDIAN);
+ }
+
+ /**
+ * Stores a short value into a byte array. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putShort(byte[] arr, long off, short val) {
+ if (UNALIGNED)
+ UNSAFE.putShort(arr, off, val);
+ else
+ putShortByByte(arr, off, val, BIG_ENDIAN);
+ }
+
+ /**
+ * Gets a char value from a byte array. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Char value from byte array.
+ */
+ public static char getChar(byte[] arr, long off) {
+ return UNALIGNED ? UNSAFE.getChar(arr, off) : getCharByByte(arr, off, BIG_ENDIAN);
+ }
+
+ /**
+ * Stores a char value into a byte array. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putChar(byte[] arr, long off, char val) {
+ if (UNALIGNED)
+ UNSAFE.putChar(arr, off, val);
+ else
+ putCharByByte(arr, off, val, BIG_ENDIAN);
+ }
+
+ /**
+ * Gets an integer value from a byte array. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Integer value from byte array.
+ */
+ public static int getInt(byte[] arr, long off) {
+ return UNALIGNED ? UNSAFE.getInt(arr, off) : getIntByByte(arr, off, BIG_ENDIAN);
+ }
+
+ /**
+ * Stores an integer value into a byte array. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putInt(byte[] arr, long off, int val) {
+ if (UNALIGNED)
+ UNSAFE.putInt(arr, off, val);
+ else
+ putIntByByte(arr, off, val, BIG_ENDIAN);
+ }
+
+ /**
+ * Gets a long value from a byte array. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Long value from byte array.
+ */
+ public static long getLong(byte[] arr, long off) {
+ return UNALIGNED ? UNSAFE.getLong(arr, off) : getLongByByte(arr, off, BIG_ENDIAN);
+ }
+
+ /**
+ * Stores a long value into a byte array. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putLong(byte[] arr, long off, long val) {
+ if (UNALIGNED)
+ UNSAFE.putLong(arr, off, val);
+ else
+ putLongByByte(arr, off, val, BIG_ENDIAN);
+ }
+
+ /**
+ * Gets a float value from a byte array. Alignment aware.
+ *
+ * @param arr Object.
+ * @param off Offset.
+ * @return Float value from byte array.
+ */
+ public static float getFloat(byte[] arr, long off) {
+ return UNALIGNED ? UNSAFE.getFloat(arr, off) : Float.intBitsToFloat(getIntByByte(arr, off, BIG_ENDIAN));
+ }
+
+ /**
+ * Stores a float value into a byte array. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putFloat(byte[] arr, long off, float val) {
+ if (UNALIGNED)
+ UNSAFE.putFloat(arr, off, val);
+ else
+ putIntByByte(arr, off, Float.floatToIntBits(val), BIG_ENDIAN);
+ }
+
+ /**
+ * Gets a double value from a byte array. Alignment aware.
+ *
+ * @param arr byte array.
+ * @param off Offset.
+ * @return Double value from byte array. Alignment aware.
+ */
+ public static double getDouble(byte[] arr, long off) {
+ return UNALIGNED ? UNSAFE.getDouble(arr, off) : Double.longBitsToDouble(getLongByByte(arr, off, BIG_ENDIAN));
+ }
+
+ /**
+ * Stores a double value into a byte array. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putDouble(byte[] arr, long off, double val) {
+ if (UNALIGNED)
+ UNSAFE.putDouble(arr, off, val);
+ else
+ putLongByByte(arr, off, Double.doubleToLongBits(val), BIG_ENDIAN);
+ }
+
+ /**
+ * Gets short value from byte array assuming that value stored in little-endian byte order and native byte order
+ * is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Short value from byte array.
+ */
+ public static short getShortLE(byte[] arr, long off) {
+ return UNALIGNED ? Short.reverseBytes(UNSAFE.getShort(arr, off)) : getShortByByte(arr, off, false);
+ }
+
+ /**
+ * Stores short value into byte array assuming that value should be stored in little-endian byte order and native
+ * byte order is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putShortLE(byte[] arr, long off, short val) {
+ if (UNALIGNED)
+ UNSAFE.putShort(arr, off, Short.reverseBytes(val));
+ else
+ putShortByByte(arr, off, val, false);
+ }
+
+ /**
+ * Gets char value from byte array assuming that value stored in little-endian byte order and native byte order
+ * is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Char value from byte array.
+ */
+ public static char getCharLE(byte[] arr, long off) {
+ return UNALIGNED ? Character.reverseBytes(UNSAFE.getChar(arr, off)) : getCharByByte(arr, off, false);
+ }
+
+ /**
+ * Stores char value into byte array assuming that value should be stored in little-endian byte order and native
+ * byte order is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putCharLE(byte[] arr, long off, char val) {
+ if (UNALIGNED)
+ UNSAFE.putChar(arr, off, Character.reverseBytes(val));
+ else
+ putCharByByte(arr, off, val, false);
+ }
+
+ /**
+ * Gets integer value from byte array assuming that value stored in little-endian byte order and native byte order
+ * is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Integer value from byte array.
+ */
+ public static int getIntLE(byte[] arr, long off) {
+ return UNALIGNED ? Integer.reverseBytes(UNSAFE.getInt(arr, off)) : getIntByByte(arr, off, false);
+ }
+
+ /**
+ * Stores integer value into byte array assuming that value should be stored in little-endian byte order and
+ * native byte order is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putIntLE(byte[] arr, long off, int val) {
+ if (UNALIGNED)
+ UNSAFE.putInt(arr, off, Integer.reverseBytes(val));
+ else
+ putIntByByte(arr, off, val, false);
+ }
+
+ /**
+ * Gets long value from byte array assuming that value stored in little-endian byte order and native byte order
+ * is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Long value from byte array.
+ */
+ public static long getLongLE(byte[] arr, long off) {
+ return UNALIGNED ? Long.reverseBytes(UNSAFE.getLong(arr, off)) : getLongByByte(arr, off, false);
+ }
+
+ /**
+ * Stores long value into byte array assuming that value should be stored in little-endian byte order and native
+ * byte order is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putLongLE(byte[] arr, long off, long val) {
+ if (UNALIGNED)
+ UNSAFE.putLong(arr, off, Long.reverseBytes(val));
+ else
+ putLongByByte(arr, off, val, false);
+ }
+
+ /**
+ * Gets float value from byte array assuming that value stored in little-endian byte order and native byte order
+ * is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Float value from byte array.
+ */
+ public static float getFloatLE(byte[] arr, long off) {
+ return Float.intBitsToFloat(
+ UNALIGNED ? Integer.reverseBytes(UNSAFE.getInt(arr, off)) : getIntByByte(arr, off, false)
+ );
+ }
+
+ /**
+ * Stores float value into byte array assuming that value should be stored in little-endian byte order and native
+ * byte order is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putFloatLE(byte[] arr, long off, float val) {
+ int intVal = Float.floatToIntBits(val);
+
+ if (UNALIGNED)
+ UNSAFE.putInt(arr, off, Integer.reverseBytes(intVal));
+ else
+ putIntByByte(arr, off, intVal, false);
+ }
+
+ /**
+ * Gets double value from byte array assuming that value stored in little-endian byte order and native byte order
+ * is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @return Double value from byte array.
+ */
+ public static double getDoubleLE(byte[] arr, long off) {
+ return Double.longBitsToDouble(
+ UNALIGNED ? Long.reverseBytes(UNSAFE.getLong(arr, off)) : getLongByByte(arr, off, false)
+ );
+ }
+
+ /**
+ * Stores double value into byte array assuming that value should be stored in little-endian byte order and
+ * native byte order is big-endian. Alignment aware.
+ *
+ * @param arr Byte array.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putDoubleLE(byte[] arr, long off, double val) {
+ long longVal = Double.doubleToLongBits(val);
+
+ if (UNALIGNED)
+ UNSAFE.putLong(arr, off, Long.reverseBytes(longVal));
+ else
+ putLongByByte(arr, off, longVal, false);
+ }
+
+ /**
+ * Gets byte value from given address.
+ *
+ * @param addr Address.
+ * @return Byte value from given address.
+ */
+ public static byte getByte(long addr) {
+ return UNSAFE.getByte(addr);
+ }
+
+ /**
+ * Stores given byte value.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putByte(long addr, byte val) {
+ UNSAFE.putByte(addr, val);
+ }
+
+ /**
+ * Gets short value from given address. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Short value from given address.
+ */
+ public static short getShort(long addr) {
+ return UNALIGNED ? UNSAFE.getShort(addr) : getShortByByte(addr, BIG_ENDIAN);
+ }
+
+ /**
+ * Stores given short value. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putShort(long addr, short val) {
+ if (UNALIGNED)
+ UNSAFE.putShort(addr, val);
+ else
+ putShortByByte(addr, val, BIG_ENDIAN);
+ }
+
+ /**
+ * Gets char value from given address. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Char value from given address.
+ */
+ public static char getChar(long addr) {
+ return UNALIGNED ? UNSAFE.getChar(addr) : getCharByByte(addr, BIG_ENDIAN);
+ }
+
+ /**
+ * Stores given char value. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putChar(long addr, char val) {
+ if (UNALIGNED)
+ UNSAFE.putChar(addr, val);
+ else
+ putCharByByte(addr, val, BIG_ENDIAN);
+ }
+
+ /**
+ * Gets integer value from given address. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Integer value from given address.
+ */
+ public static int getInt(long addr) {
+ return UNALIGNED ? UNSAFE.getInt(addr) : getIntByByte(addr, BIG_ENDIAN);
+ }
+
+ /**
+ * Stores given integer value. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putInt(long addr, int val) {
+ if (UNALIGNED)
+ UNSAFE.putInt(addr, val);
+ else
+ putIntByByte(addr, val, BIG_ENDIAN);
+ }
+
+ /**
+ * Gets long value from given address. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Long value from given address.
+ */
+ public static long getLong(long addr) {
+ return UNALIGNED ? UNSAFE.getLong(addr) : getLongByByte(addr, BIG_ENDIAN);
+ }
+
+ /**
+ * Stores given integer value. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putLong(long addr, long val) {
+ if (UNALIGNED)
+ UNSAFE.putLong(addr, val);
+ else
+ putLongByByte(addr, val, BIG_ENDIAN);
+ }
+
+ /**
+ * Gets float value from given address. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Float value from given address.
+ */
+ public static float getFloat(long addr) {
+ return UNALIGNED ? UNSAFE.getFloat(addr) : Float.intBitsToFloat(getIntByByte(addr, BIG_ENDIAN));
+ }
+
+ /**
+ * Stores given float value. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putFloat(long addr, float val) {
+ if (UNALIGNED)
+ UNSAFE.putFloat(addr, val);
+ else
+ putIntByByte(addr, Float.floatToIntBits(val), BIG_ENDIAN);
+ }
+
+ /**
+ * Gets double value from given address. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Double value from given address.
+ */
+ public static double getDouble(long addr) {
+ return UNALIGNED ? UNSAFE.getDouble(addr) : Double.longBitsToDouble(getLongByByte(addr, BIG_ENDIAN));
+ }
+
+ /**
+ * Stores given double value. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putDouble(long addr, double val) {
+ if (UNALIGNED)
+ UNSAFE.putDouble(addr, val);
+ else
+ putLongByByte(addr, Double.doubleToLongBits(val), BIG_ENDIAN);
+ }
+
+ /**
+ * Gets short value from given address assuming that value stored in little-endian byte order and native byte order
+ * is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Short value from given address.
+ */
+ public static short getShortLE(long addr) {
+ return UNALIGNED ? Short.reverseBytes(UNSAFE.getShort(addr)) : getShortByByte(addr, false);
+ }
+
+ /**
+ * Stores given short value assuming that value should be stored in little-endian byte order and native byte
+ * order is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putShortLE(long addr, short val) {
+ if (UNALIGNED)
+ UNSAFE.putShort(addr, Short.reverseBytes(val));
+ else
+ putShortByByte(addr, val, false);
+ }
+
+ /**
+ * Gets char value from given address assuming that value stored in little-endian byte order and native byte order
+ * is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Char value from given address.
+ */
+ public static char getCharLE(long addr) {
+ return UNALIGNED ? Character.reverseBytes(UNSAFE.getChar(addr)) : getCharByByte(addr, false);
+ }
+
+ /**
+ * Stores given char value assuming that value should be stored in little-endian byte order and native byte order
+ * is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putCharLE(long addr, char val) {
+ if (UNALIGNED)
+ UNSAFE.putChar(addr, Character.reverseBytes(val));
+ else
+ putCharByByte(addr, val, false);
+ }
+
+ /**
+ * Gets integer value from given address assuming that value stored in little-endian byte order
+ * and native byte order is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Integer value from given address.
+ */
+ public static int getIntLE(long addr) {
+ return UNALIGNED ? Integer.reverseBytes(UNSAFE.getInt(addr)) : getIntByByte(addr, false);
+ }
+
+ /**
+ * Stores given integer value assuming that value should be stored in little-endian byte order
+ * and native byte order is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putIntLE(long addr, int val) {
+ if (UNALIGNED)
+ UNSAFE.putInt(addr, Integer.reverseBytes(val));
+ else
+ putIntByByte(addr, val, false);
+ }
+
+ /**
+ * Gets long value from given address assuming that value stored in little-endian byte order
+ * and native byte order is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Long value from given address.
+ */
+ public static long getLongLE(long addr) {
+ return UNALIGNED ? Long.reverseBytes(UNSAFE.getLong(addr)) : getLongByByte(addr, false);
+ }
+
+ /**
+ * Stores given integer value assuming that value should be stored in little-endian byte order
+ * and native byte order is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putLongLE(long addr, long val) {
+ if (UNALIGNED)
+ UNSAFE.putLong(addr, Long.reverseBytes(val));
+ else
+ putLongByByte(addr, val, false);
+ }
+
+ /**
+ * Gets float value from given address assuming that value stored in little-endian byte order
+ * and native byte order is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Float value from given address.
+ */
+ public static float getFloatLE(long addr) {
+ return Float.intBitsToFloat(UNALIGNED ? Integer.reverseBytes(UNSAFE.getInt(addr)) : getIntByByte(addr, false));
+ }
+
+ /**
+ * Stores given float value assuming that value should be stored in little-endian byte order
+ * and native byte order is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putFloatLE(long addr, float val) {
+ int intVal = Float.floatToIntBits(val);
+
+ if (UNALIGNED)
+ UNSAFE.putInt(addr, Integer.reverseBytes(intVal));
+ else
+ putIntByByte(addr, intVal, false);
+ }
+
+ /**
+ * Gets double value from given address assuming that value stored in little-endian byte order
+ * and native byte order is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @return Double value from given address.
+ */
+ public static double getDoubleLE(long addr) {
+ return Double.longBitsToDouble(
+ UNALIGNED ? Long.reverseBytes(UNSAFE.getLong(addr)) : getLongByByte(addr, false)
+ );
+ }
+
+ /**
+ * Stores given double value assuming that value should be stored in little-endian byte order
+ * and native byte order is big-endian. Alignment aware.
+ *
+ * @param addr Address.
+ * @param val Value.
+ */
+ public static void putDoubleLE(long addr, double val) {
+ long longVal = Double.doubleToLongBits(val);
+
+ if (UNALIGNED)
+ UNSAFE.putLong(addr, Long.reverseBytes(longVal));
+ else
+ putLongByByte(addr, longVal, false);
+ }
+
+ /**
+ * Returns static field offset.
+ *
+ * @param field Field.
+ * @return Static field offset.
+ */
+ public static long staticFieldOffset(Field field) {
+ return UNSAFE.staticFieldOffset(field);
+ }
+
+ /**
+ * Returns object field offset.
+ *
+ * @param field Field.
+ * @return Object field offset.
+ */
+ public static long objectFieldOffset(Field field) {
+ return UNSAFE.objectFieldOffset(field);
+ }
+
+ /**
+ * Returns static field base.
+ *
+ * @param field Field.
+ * @return Static field base.
+ */
+ public static Object staticFieldBase(Field field) {
+ return UNSAFE.staticFieldBase(field);
+ }
+
+ /**
+ * Allocates memory.
+ *
+ * @param size Size.
+ * @return address.
+ */
+ public static long allocateMemory(long size) {
+ return UNSAFE.allocateMemory(size);
+ }
+
+ /**
+ * Reallocates memory.
+ *
+ * @param addr Address.
+ * @param len Length.
+ * @return address.
+ */
+ public static long reallocateMemory(long addr, long len) {
+ return UNSAFE.reallocateMemory(addr, len);
+ }
+
+ /**
+ * Fills memory with given value.
+ *
+ * @param addr Address.
+ * @param len Length.
+ * @param val Value.
+ */
+ public static void setMemory(long addr, long len, byte val) {
+ UNSAFE.setMemory(addr, len, val);
+ }
+
+ /**
+ * Copy memory between offheap locations.
+ *
+ * @param srcAddr Source address.
+ * @param dstAddr Destination address.
+ * @param len Length.
+ */
+ public static void copyOffheapOffheap(long srcAddr, long dstAddr, long len) {
+ if (len <= PER_BYTE_THRESHOLD) {
+ for (int i = 0; i < len; i++)
+ UNSAFE.putByte(dstAddr + i, UNSAFE.getByte(srcAddr + i));
+ }
+ else
+ UNSAFE.copyMemory(srcAddr, dstAddr, len);
+ }
+
+ /**
+ * Copy memory from offheap to heap.
+ *
+ * @param srcAddr Source address.
+ * @param dstBase Destination base.
+ * @param dstOff Destination offset.
+ * @param len Length.
+ */
+ public static void copyOffheapHeap(long srcAddr, Object dstBase, long dstOff, long len) {
+ if (len <= PER_BYTE_THRESHOLD) {
+ for (int i = 0; i < len; i++)
+ UNSAFE.putByte(dstBase, dstOff + i, UNSAFE.getByte(srcAddr + i));
+ }
+ else
+ UNSAFE.copyMemory(null, srcAddr, dstBase, dstOff, len);
+ }
+
+ /**
+ * Copy memory from heap to offheap.
+ *
+ * @param srcBase Source base.
+ * @param srcOff Source offset.
+ * @param dstAddr Destination address.
+ * @param len Length.
+ */
+ public static void copyHeapOffheap(Object srcBase, long srcOff, long dstAddr, long len) {
+ if (len <= PER_BYTE_THRESHOLD) {
+ for (int i = 0; i < len; i++)
+ UNSAFE.putByte(dstAddr + i, UNSAFE.getByte(srcBase, srcOff + i));
+ }
+ else
+ UNSAFE.copyMemory(srcBase, srcOff, null, dstAddr, len);
+ }
+
+ /**
+ * Copies memory.
+ *
+ * @param src Source.
+ * @param dst Dst.
+ * @param len Length.
+ */
+ public static void copyMemory(long src, long dst, long len) {
+ UNSAFE.copyMemory(src, dst, len);
+ }
+
+ /**
+ * Sets all bytes in a given block of memory to a copy of another block.
+ *
+ * @param srcBase Source base.
+ * @param srcOff Source offset.
+ * @param dstBase Dst base.
+ * @param dstOff Dst offset.
+ * @param len Length.
+ */
+ public static void copyMemory(Object srcBase, long srcOff, Object dstBase, long dstOff, long len) {
+ if (len <= PER_BYTE_THRESHOLD && srcBase != null && dstBase != null) {
+ for (int i = 0; i < len; i++)
+ UNSAFE.putByte(dstBase, dstOff + i, UNSAFE.getByte(srcBase, srcOff + i));
+ }
+ else
+ UNSAFE.copyMemory(srcBase, srcOff, dstBase, dstOff, len);
+ }
+
+ /**
+ * Frees memory.
+ *
+ * @param addr Address.
+ */
+ public static void freeMemory(long addr) {
+ UNSAFE.freeMemory(addr);
+ }
+
+ /**
+ * Returns the offset of the first element in the storage allocation of a given array class.
+ *
+ * @param cls Class.
+ * @return the offset of the first element in the storage allocation of a given array class.
+ */
+ public static int arrayBaseOffset(Class cls) {
+ return UNSAFE.arrayBaseOffset(cls);
+ }
+
+ /**
+ * Allocates instance of given class.
+ *
+ * @param cls Class.
+ * @return Allocated instance.
+ */
+ public static Object allocateInstance(Class cls) throws InstantiationException {
+ return UNSAFE.allocateInstance(cls);
+ }
+
+ /**
+ * Integer CAS.
+ *
+ * @param obj Object.
+ * @param off Offset.
+ * @param exp Expected.
+ * @param upd Upd.
+ * @return {@code True} if operation completed successfully, {@code false} - otherwise.
+ */
+ public static boolean compareAndSwapInt(Object obj, long off, int exp, int upd) {
+ return UNSAFE.compareAndSwapInt(obj, off, exp, upd);
+ }
+
+ /**
+ * Long CAS.
+ *
+ * @param obj Object.
+ * @param off Offset.
+ * @param exp Expected.
+ * @param upd Upd.
+ * @return {@code True} if operation completed successfully, {@code false} - otherwise.
+ */
+ public static boolean compareAndSwapLong(Object obj, long off, long exp, long upd) {
+ return UNSAFE.compareAndSwapLong(obj, off, exp, upd);
+ }
+
+ /**
+ * Atomically increments value stored in an integer pointed by {@code ptr}.
+ *
+ * @param ptr Pointer to an integer.
+ * @return Updated value.
+ */
+ public static int incrementAndGetInt(long ptr) {
+ return UNSAFE.getAndAddInt(null, ptr, 1) + 1;
+ }
+
+ /**
+ * Atomically increments value stored in an integer pointed by {@code ptr}.
+ *
+ * @param ptr Pointer to an integer.
+ * @return Updated value.
+ */
+ public static int decrementAndGetInt(long ptr) {
+ return UNSAFE.getAndAddInt(null, ptr, -1) - 1;
+ }
+
+ /**
+ * Gets byte value with volatile semantic.
+ *
+ * @param obj Object.
+ * @param off Offset.
+ * @return Byte value.
+ */
+ public static byte getByteVolatile(Object obj, long off) {
+ return UNSAFE.getByteVolatile(obj, off);
+ }
+
+ /**
+ * Stores byte value with volatile semantic.
+ *
+ * @param obj Object.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putByteVolatile(Object obj, long off, byte val) {
+ UNSAFE.putByteVolatile(obj, off, val);
+ }
+
+ /**
+ * Gets integer value with volatile semantic.
+ *
+ * @param obj Object.
+ * @param off Offset.
+ * @return Integer value.
+ */
+ public static int getIntVolatile(Object obj, long off) {
+ return UNSAFE.getIntVolatile(obj, off);
+ }
+
+ /**
+ * Stores integer value with volatile semantic.
+ *
+ * @param obj Object.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putIntVolatile(Object obj, long off, int val) {
+ UNSAFE.putIntVolatile(obj, off, val);
+ }
+
+ /**
+ * Gets long value with volatile semantic.
+ *
+ * @param obj Object.
+ * @param off Offset.
+ * @return Long value.
+ */
+ public static long getLongVolatile(Object obj, long off) {
+ return UNSAFE.getLongVolatile(obj, off);
+ }
+
+ /**
+ * Stores long value with volatile semantic.
+ *
+ * @param obj Object.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putLongVolatile(Object obj, long off, long val) {
+ UNSAFE.putLongVolatile(obj, off, val);
+ }
+
+ /**
+ * Stores reference value with volatile semantic.
+ *
+ * @param obj Object.
+ * @param off Offset.
+ * @param val Value.
+ */
+ public static void putObjectVolatile(Object obj, long off, Object val) {
+ UNSAFE.putObjectVolatile(obj, off, val);
+ }
+
+ /**
+ * Returns page size.
+ *
+ * @return Page size.
+ */
+ public static int pageSize() {
+ return UNSAFE.pageSize();
+ }
+
+ /**
+ * Returns address of {@link Buffer} instance.
+ *
+ * @param buf Buffer.
+ * @return Buffer memory address.
+ */
+ public static long bufferAddress(ByteBuffer buf) {
+ assert buf.isDirect();
+ return UNSAFE.getLong(buf, DIRECT_BUF_ADDR_OFF);
+ }
+
+ /**
+ * Invokes some method on {@code sun.misc.Unsafe} instance.
+ *
+ * @param mtd Method.
+ * @param args Arguments.
+ */
+ public static Object invoke(Method mtd, Object... args) {
+ try {
+ return mtd.invoke(UNSAFE, args);
+ }
+ catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException("Unsafe invocation failed [cls=" + UNSAFE.getClass() + ", mtd=" + mtd + ']', e);
+ }
+ }
+
+ /**
+ * Cleans direct {@code java.nio.ByteBuffer}
+ *
+ * @param buf Direct buffer.
+ */
+ public static void cleanDirectBuffer(ByteBuffer buf) {
+ assert buf.isDirect();
+
+ DIRECT_BUF_CLEANER.clean(buf);
+ }
+
+ /**
+ * Returns unaligned flag.
+ */
+ private static boolean unaligned() {
+ String arch = System.getProperty("os.arch");
+
+ return "i386".equals(arch) || "x86".equals(arch) || "amd64".equals(arch) || "x86_64".equals(arch);
+ }
+
+ /**
+ * @return Instance of Unsafe class.
+ */
+ private static Unsafe unsafe() {
+ try {
+ return Unsafe.getUnsafe();
+ }
+ catch (SecurityException ignored) {
+ try {
+ return AccessController.doPrivileged(
+ new PrivilegedExceptionAction<Unsafe>() {
+ @Override public Unsafe run() throws Exception {
+ Field f = Unsafe.class.getDeclaredField("theUnsafe");
+
+ f.setAccessible(true);
+
+ return (Unsafe)f.get(null);
+ }
+ });
+ }
+ catch (PrivilegedActionException e) {
+ throw new RuntimeException("Could not initialize intrinsics.", e.getCause());
+ }
+ }
+ }
+
+ /** */
+ private static long bufferAddressOffset() {
+ final ByteBuffer maybeDirectBuf = ByteBuffer.allocateDirect(1);
+
+ Field addrField = AccessController.doPrivileged(new PrivilegedAction<Field>() {
+ @Override public Field run() {
+ try {
+ Field addrFld = Buffer.class.getDeclaredField("address");
+
+ addrFld.setAccessible(true);
+
+ if (addrFld.getLong(maybeDirectBuf) == 0)
+ throw new RuntimeException("java.nio.DirectByteBuffer.address field is unavailable.");
+
+ return addrFld;
+ }
+ catch (Exception e) {
+ throw new RuntimeException("java.nio.DirectByteBuffer.address field is unavailable.", e);
+ }
+ }
+ });
+
+ return UNSAFE.objectFieldOffset(addrField);
+ }
+
+ /**
+ * Returns {@code JavaNioAccess} instance from private API for corresponding Java version.
+ *
+ * @return {@code JavaNioAccess} instance for corresponding Java version.
+ * @throws RuntimeException If getting access to the private API is failed.
+ */
+ private static Object javaNioAccessObject() {
+ String pkgName = miscPackage();
+
+ try {
+ Class<?> cls = Class.forName(pkgName + ".misc.SharedSecrets");
+
+ Method mth = cls.getMethod("getJavaNioAccess");
+
+ return mth.invoke(null);
+ }
+ catch (ReflectiveOperationException e) {
+ throw new RuntimeException(pkgName + ".misc.JavaNioAccess class is unavailable."
+ + FeatureChecker.JAVA_VER_SPECIFIC_WARN, e);
+ }
+ }
+
+ /**
+ * Returns reference to {@code JavaNioAccess.newDirectByteBuffer} method
+ * from private API for corresponding Java version.
+ *
+ * @param nioAccessObj Java NIO access object.
+ * @return Reference to {@code JavaNioAccess.newDirectByteBuffer} method
+ * @throws RuntimeException If getting access to the private API is failed.
+ */
+ private static Method newDirectBufferMethod(Object nioAccessObj) {
+ try {
+ Class<?> cls = nioAccessObj.getClass();
+
+ Method mtd = cls.getMethod("newDirectByteBuffer", long.class, int.class, Object.class);
+
+ mtd.setAccessible(true);
+
+ return mtd;
+ }
+ catch (ReflectiveOperationException e) {
+ throw new RuntimeException(miscPackage() + ".JavaNioAccess#newDirectByteBuffer() method is unavailable."
+ + FeatureChecker.JAVA_VER_SPECIFIC_WARN, e);
+ }
+ }
+
+ /** */
+ @NotNull private static String miscPackage() {
+ int javaVer = majorJavaVersion(jdkVersion());
+
+ return javaVer < 9 ? "sun" : "jdk.internal";
+ }
+
+
+ /**
+ * Creates and tests contructor for Direct ByteBuffer. Test is wrapping one-byte unsafe memory into a buffer.
+ *
+ * @return constructor for creating direct ByteBuffers.
+ */
+ @NotNull
+ private static Constructor<?> createAndTestNewDirectBufferCtor() {
+ Constructor<?> ctorCandidate = createNewDirectBufferCtor();
+
+ int l = 1;
+ long ptr = UNSAFE.allocateMemory(l);
+
+ try {
+ ByteBuffer buf = wrapPointerDirectBufferConstructor(ptr, l, ctorCandidate);
+
+ if (!buf.isDirect())
+ throw new IllegalArgumentException("Buffer expected to be direct, internal error during #wrapPointerDirectBufCtor()");
+ }
+ finally {
+ UNSAFE.freeMemory(ptr);
+ }
+
+ return ctorCandidate;
+ }
+
+
+ /**
+ * Simply create some instance of direct Byte Buffer and try to get it's class declared constructor.
+ *
+ * @return constructor for creating direct ByteBuffers.
+ */
+ @NotNull
+ private static Constructor<?> createNewDirectBufferCtor() {
+ try {
+ ByteBuffer buf = ByteBuffer.allocateDirect(1).order(NATIVE_BYTE_ORDER);
+
+ Constructor<?> ctor = buf.getClass().getDeclaredConstructor(long.class, int.class);
+
+ ctor.setAccessible(true);
+
+ return ctor;
+ }
+ catch (NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException("Unable to set up byte buffer creation using reflections :" + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * @param obj Object.
+ * @param off Offset.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static short getShortByByte(Object obj, long off, boolean bigEndian) {
+ if (bigEndian)
+ return (short)(UNSAFE.getByte(obj, off) << 8 | (UNSAFE.getByte(obj, off + 1) & 0xff));
+ else
+ return (short)(UNSAFE.getByte(obj, off + 1) << 8 | (UNSAFE.getByte(obj, off) & 0xff));
+ }
+
+ /**
+ * @param obj Object.
+ * @param off Offset.
+ * @param val Value.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static void putShortByByte(Object obj, long off, short val, boolean bigEndian) {
+ if (bigEndian) {
+ UNSAFE.putByte(obj, off, (byte)(val >> 8));
+ UNSAFE.putByte(obj, off + 1, (byte)val);
+ }
+ else {
+ UNSAFE.putByte(obj, off + 1, (byte)(val >> 8));
+ UNSAFE.putByte(obj, off, (byte)val);
+ }
+ }
+
+ /**
+ * @param obj Object.
+ * @param off Offset.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static char getCharByByte(Object obj, long off, boolean bigEndian) {
+ if (bigEndian)
+ return (char)(UNSAFE.getByte(obj, off) << 8 | (UNSAFE.getByte(obj, off + 1) & 0xff));
+ else
+ return (char)(UNSAFE.getByte(obj, off + 1) << 8 | (UNSAFE.getByte(obj, off) & 0xff));
+ }
+
+ /**
+ * @param obj Object.
+ * @param addr Address.
+ * @param val Value.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static void putCharByByte(Object obj, long addr, char val, boolean bigEndian) {
+ if (bigEndian) {
+ UNSAFE.putByte(obj, addr, (byte)(val >> 8));
+ UNSAFE.putByte(obj, addr + 1, (byte)val);
+ }
+ else {
+ UNSAFE.putByte(obj, addr + 1, (byte)(val >> 8));
+ UNSAFE.putByte(obj, addr, (byte)val);
+ }
+ }
+
+ /**
+ * @param obj Object.
+ * @param addr Address.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static int getIntByByte(Object obj, long addr, boolean bigEndian) {
+ if (bigEndian) {
+ return (((int)UNSAFE.getByte(obj, addr)) << 24) |
+ (((int)UNSAFE.getByte(obj, addr + 1) & 0xff) << 16) |
+ (((int)UNSAFE.getByte(obj, addr + 2) & 0xff) << 8) |
+ (((int)UNSAFE.getByte(obj, addr + 3) & 0xff));
+ }
+ else {
+ return (((int)UNSAFE.getByte(obj, addr + 3)) << 24) |
+ (((int)UNSAFE.getByte(obj, addr + 2) & 0xff) << 16) |
+ (((int)UNSAFE.getByte(obj, addr + 1) & 0xff) << 8) |
+ (((int)UNSAFE.getByte(obj, addr) & 0xff));
+ }
+ }
+
+ /**
+ * @param obj Object.
+ * @param addr Address.
+ * @param val Value.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static void putIntByByte(Object obj, long addr, int val, boolean bigEndian) {
+ if (bigEndian) {
+ UNSAFE.putByte(obj, addr, (byte)(val >> 24));
+ UNSAFE.putByte(obj, addr + 1, (byte)(val >> 16));
+ UNSAFE.putByte(obj, addr + 2, (byte)(val >> 8));
+ UNSAFE.putByte(obj, addr + 3, (byte)(val));
+ }
+ else {
+ UNSAFE.putByte(obj, addr + 3, (byte)(val >> 24));
+ UNSAFE.putByte(obj, addr + 2, (byte)(val >> 16));
+ UNSAFE.putByte(obj, addr + 1, (byte)(val >> 8));
+ UNSAFE.putByte(obj, addr, (byte)(val));
+ }
+ }
+
+ /**
+ * @param obj Object.
+ * @param addr Address.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static long getLongByByte(Object obj, long addr, boolean bigEndian) {
+ if (bigEndian) {
+ return (((long)UNSAFE.getByte(obj, addr)) << 56) |
+ (((long)UNSAFE.getByte(obj, addr + 1) & 0xff) << 48) |
+ (((long)UNSAFE.getByte(obj, addr + 2) & 0xff) << 40) |
+ (((long)UNSAFE.getByte(obj, addr + 3) & 0xff) << 32) |
+ (((long)UNSAFE.getByte(obj, addr + 4) & 0xff) << 24) |
+ (((long)UNSAFE.getByte(obj, addr + 5) & 0xff) << 16) |
+ (((long)UNSAFE.getByte(obj, addr + 6) & 0xff) << 8) |
+ (((long)UNSAFE.getByte(obj, addr + 7) & 0xff));
+ }
+ else {
+ return (((long)UNSAFE.getByte(obj, addr + 7)) << 56) |
+ (((long)UNSAFE.getByte(obj, addr + 6) & 0xff) << 48) |
+ (((long)UNSAFE.getByte(obj, addr + 5) & 0xff) << 40) |
+ (((long)UNSAFE.getByte(obj, addr + 4) & 0xff) << 32) |
+ (((long)UNSAFE.getByte(obj, addr + 3) & 0xff) << 24) |
+ (((long)UNSAFE.getByte(obj, addr + 2) & 0xff) << 16) |
+ (((long)UNSAFE.getByte(obj, addr + 1) & 0xff) << 8) |
+ (((long)UNSAFE.getByte(obj, addr) & 0xff));
+ }
+ }
+
+ /**
+ * @param obj Object.
+ * @param addr Address.
+ * @param val Value.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static void putLongByByte(Object obj, long addr, long val, boolean bigEndian) {
+ if (bigEndian) {
+ UNSAFE.putByte(obj, addr, (byte)(val >> 56));
+ UNSAFE.putByte(obj, addr + 1, (byte)(val >> 48));
+ UNSAFE.putByte(obj, addr + 2, (byte)(val >> 40));
+ UNSAFE.putByte(obj, addr + 3, (byte)(val >> 32));
+ UNSAFE.putByte(obj, addr + 4, (byte)(val >> 24));
+ UNSAFE.putByte(obj, addr + 5, (byte)(val >> 16));
+ UNSAFE.putByte(obj, addr + 6, (byte)(val >> 8));
+ UNSAFE.putByte(obj, addr + 7, (byte)(val));
+ }
+ else {
+ UNSAFE.putByte(obj, addr + 7, (byte)(val >> 56));
+ UNSAFE.putByte(obj, addr + 6, (byte)(val >> 48));
+ UNSAFE.putByte(obj, addr + 5, (byte)(val >> 40));
+ UNSAFE.putByte(obj, addr + 4, (byte)(val >> 32));
+ UNSAFE.putByte(obj, addr + 3, (byte)(val >> 24));
+ UNSAFE.putByte(obj, addr + 2, (byte)(val >> 16));
+ UNSAFE.putByte(obj, addr + 1, (byte)(val >> 8));
+ UNSAFE.putByte(obj, addr, (byte)(val));
+ }
+ }
+
+ /**
+ * @param addr Address.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static short getShortByByte(long addr, boolean bigEndian) {
+ if (bigEndian)
+ return (short)(UNSAFE.getByte(addr) << 8 | (UNSAFE.getByte(addr + 1) & 0xff));
+ else
+ return (short)(UNSAFE.getByte(addr + 1) << 8 | (UNSAFE.getByte(addr) & 0xff));
+ }
+
+ /**
+ * @param addr Address.
+ * @param val Value.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static void putShortByByte(long addr, short val, boolean bigEndian) {
+ if (bigEndian) {
+ UNSAFE.putByte(addr, (byte)(val >> 8));
+ UNSAFE.putByte(addr + 1, (byte)val);
+ }
+ else {
+ UNSAFE.putByte(addr + 1, (byte)(val >> 8));
+ UNSAFE.putByte(addr, (byte)val);
+ }
+ }
+
+ /**
+ * @param addr Address.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static char getCharByByte(long addr, boolean bigEndian) {
+ if (bigEndian)
+ return (char)(UNSAFE.getByte(addr) << 8 | (UNSAFE.getByte(addr + 1) & 0xff));
+ else
+ return (char)(UNSAFE.getByte(addr + 1) << 8 | (UNSAFE.getByte(addr) & 0xff));
+ }
+
+ /**
+ * @param addr Address.
+ * @param val Value.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static void putCharByByte(long addr, char val, boolean bigEndian) {
+ if (bigEndian) {
+ UNSAFE.putByte(addr, (byte)(val >> 8));
+ UNSAFE.putByte(addr + 1, (byte)val);
+ }
+ else {
+ UNSAFE.putByte(addr + 1, (byte)(val >> 8));
+ UNSAFE.putByte(addr, (byte)val);
+ }
+ }
+
+ /**
+ * @param addr Address.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static int getIntByByte(long addr, boolean bigEndian) {
+ if (bigEndian) {
+ return (((int)UNSAFE.getByte(addr)) << 24) |
+ (((int)UNSAFE.getByte(addr + 1) & 0xff) << 16) |
+ (((int)UNSAFE.getByte(addr + 2) & 0xff) << 8) |
+ (((int)UNSAFE.getByte(addr + 3) & 0xff));
+ }
+ else {
+ return (((int)UNSAFE.getByte(addr + 3)) << 24) |
+ (((int)UNSAFE.getByte(addr + 2) & 0xff) << 16) |
+ (((int)UNSAFE.getByte(addr + 1) & 0xff) << 8) |
+ (((int)UNSAFE.getByte(addr) & 0xff));
+ }
+ }
+
+ /**
+ * @param addr Address.
+ * @param val Value.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static void putIntByByte(long addr, int val, boolean bigEndian) {
+ if (bigEndian) {
+ UNSAFE.putByte(addr, (byte)(val >> 24));
+ UNSAFE.putByte(addr + 1, (byte)(val >> 16));
+ UNSAFE.putByte(addr + 2, (byte)(val >> 8));
+ UNSAFE.putByte(addr + 3, (byte)(val));
+ }
+ else {
+ UNSAFE.putByte(addr + 3, (byte)(val >> 24));
+ UNSAFE.putByte(addr + 2, (byte)(val >> 16));
+ UNSAFE.putByte(addr + 1, (byte)(val >> 8));
+ UNSAFE.putByte(addr, (byte)(val));
+ }
+ }
+
+ /**
+ * @param addr Address.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static long getLongByByte(long addr, boolean bigEndian) {
+ if (bigEndian) {
+ return (((long)UNSAFE.getByte(addr)) << 56) |
+ (((long)UNSAFE.getByte(addr + 1) & 0xff) << 48) |
+ (((long)UNSAFE.getByte(addr + 2) & 0xff) << 40) |
+ (((long)UNSAFE.getByte(addr + 3) & 0xff) << 32) |
+ (((long)UNSAFE.getByte(addr + 4) & 0xff) << 24) |
+ (((long)UNSAFE.getByte(addr + 5) & 0xff) << 16) |
+ (((long)UNSAFE.getByte(addr + 6) & 0xff) << 8) |
+ (((long)UNSAFE.getByte(addr + 7) & 0xff));
+ }
+ else {
+ return (((long)UNSAFE.getByte(addr + 7)) << 56) |
+ (((long)UNSAFE.getByte(addr + 6) & 0xff) << 48) |
+ (((long)UNSAFE.getByte(addr + 5) & 0xff) << 40) |
+ (((long)UNSAFE.getByte(addr + 4) & 0xff) << 32) |
+ (((long)UNSAFE.getByte(addr + 3) & 0xff) << 24) |
+ (((long)UNSAFE.getByte(addr + 2) & 0xff) << 16) |
+ (((long)UNSAFE.getByte(addr + 1) & 0xff) << 8) |
+ (((long)UNSAFE.getByte(addr) & 0xff));
+ }
+ }
+
+ /**
+ * @param addr Address.
+ * @param val Value.
+ * @param bigEndian Order of value bytes in memory. If {@code true} - big-endian, otherwise little-endian.
+ */
+ private static void putLongByByte(long addr, long val, boolean bigEndian) {
+ if (bigEndian) {
+ UNSAFE.putByte(addr, (byte)(val >> 56));
+ UNSAFE.putByte(addr + 1, (byte)(val >> 48));
+ UNSAFE.putByte(addr + 2, (byte)(val >> 40));
+ UNSAFE.putByte(addr + 3, (byte)(val >> 32));
+ UNSAFE.putByte(addr + 4, (byte)(val >> 24));
+ UNSAFE.putByte(addr + 5, (byte)(val >> 16));
+ UNSAFE.putByte(addr + 6, (byte)(val >> 8));
+ UNSAFE.putByte(addr + 7, (byte)(val));
+ }
+ else {
+ UNSAFE.putByte(addr + 7, (byte)(val >> 56));
+ UNSAFE.putByte(addr + 6, (byte)(val >> 48));
+ UNSAFE.putByte(addr + 5, (byte)(val >> 40));
+ UNSAFE.putByte(addr + 4, (byte)(val >> 32));
+ UNSAFE.putByte(addr + 3, (byte)(val >> 24));
+ UNSAFE.putByte(addr + 2, (byte)(val >> 16));
+ UNSAFE.putByte(addr + 1, (byte)(val >> 8));
+ UNSAFE.putByte(addr, (byte)(val));
+ }
+ }
+
+ /**
+ * @param ptr1 First pointer.
+ * @param ptr2 Second pointer.
+ * @param size Memory size.
+ * @return {@code True} if equals.
+ */
+ public static boolean compare(long ptr1, long ptr2, int size) {
+ assert ptr1 > 0 : ptr1;
+ assert ptr2 > 0 : ptr2;
+ assert size > 0 : size;
+
+ if (ptr1 == ptr2)
+ return true;
+
+ int words = size / 8;
+
+ for (int i = 0; i < words; i++) {
+ long w1 = getLong(ptr1);
+ long w2 = getLong(ptr2);
+
+ if (w1 != w2)
+ return false;
+
+ ptr1 += 8;
+ ptr2 += 8;
+ }
+
+ int left = size % 8;
+
+ for (int i = 0; i < left; i++) {
+ byte b1 = getByte(ptr1);
+ byte b2 = getByte(ptr2);
+
+ if (b1 != b2)
+ return false;
+
+ ptr1++;
+ ptr2++;
+ }
+
+ return true;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
new file mode 100644
index 0000000..7f34bc8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+
+/**
+ * Collection of utility methods used throughout the system.
+ */
+public class IgniteUtils {
+ /** Version of the JDK. */
+ private static String jdkVer;
+
+ /*
+ * Initializes enterprise check.
+ */
+ static {
+ IgniteUtils.jdkVer = System.getProperty("java.specification.version");
+ }
+
+ /**
+ * Get JDK version.
+ *
+ * @return JDK version.
+ */
+ public static String jdkVersion() {
+ return jdkVer;
+ }
+
+ /**
+ * Get major Java version from a string.
+ *
+ * @param verStr Version string.
+ * @return Major version or zero if failed to resolve.
+ */
+ public static int majorJavaVersion(String verStr) {
+ if (verStr == null || verStr.isEmpty())
+ return 0;
+
+ try {
+ String[] parts = verStr.split("\\.");
+
+ int major = Integer.parseInt(parts[0]);
+
+ if (parts.length == 1)
+ return major;
+
+ int minor = Integer.parseInt(parts[1]);
+
+ return major == 1 ? minor : major;
+ }
+ catch (Exception e) {
+ return 0;
+ }
+ }
+
+ /**
+ * Returns a capacity that is sufficient to keep the map from being resized as
+ * long as it grows no larger than expSize and the load factor is >= its
+ * default (0.75).
+ *
+ * Copy pasted from guava. See com.google.common.collect.Maps#capacity(int)
+ *
+ * @param expSize Expected size of the created map.
+ * @return Capacity.
+ */
+ public static int capacity(int expSize) {
+ if (expSize < 3)
+ return expSize + 1;
+
+ if (expSize < (1 << 30))
+ return expSize + expSize / 3;
+
+ return Integer.MAX_VALUE; // any large value
+ }
+
+ /**
+ * Creates new {@link HashMap} with expected size.
+ *
+ * @param expSize Expected size of the created map.
+ * @param <K> Type of the map's keys.
+ * @param <V> Type of the map's values.
+ * @return New map.
+ */
+ public static <K, V> HashMap<K, V> newHashMap(int expSize) {
+ return new HashMap<>(capacity(expSize));
+ }
+
+ /**
+ * Creates new {@link LinkedHashMap} with expected size.
+ *
+ * @param expSize Expected size of created map.
+ * @param <K> Type of the map's keys.
+ * @param <V> Type of the map's values.
+ * @return New map.
+ */
+ public static <K, V> LinkedHashMap<K, V> newLinkedHashMap(int expSize) {
+ return new LinkedHashMap<>(capacity(expSize));
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ReflectiveDirectBufferCleaner.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ReflectiveDirectBufferCleaner.java
new file mode 100644
index 0000000..017a7b0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ReflectiveDirectBufferCleaner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+
+/**
+ * {@link DirectBufferCleaner} implementation based on the {@code sun.misc.Cleaner} and the
+ * {@code sun.nio.ch.DirectBuffer.cleaner()} method.
+ *
+ * Note: This implementation will not work on Java 9+.
+ */
+public class ReflectiveDirectBufferCleaner implements DirectBufferCleaner {
+ /** Cleaner method. */
+ private final Method cleanerMtd;
+
+ /** Clean method. */
+ private final Method cleanMtd;
+
+ /** */
+ public ReflectiveDirectBufferCleaner() {
+ try {
+ cleanerMtd = Class.forName("sun.nio.ch.DirectBuffer").getMethod("cleaner");
+
+ }
+ catch (ClassNotFoundException | NoSuchMethodException e) {
+ throw new RuntimeException("No sun.nio.ch.DirectBuffer.cleaner() method found", e);
+ }
+
+ try {
+ cleanMtd = Class.forName("sun.misc.Cleaner").getMethod("clean");
+ }
+ catch (ClassNotFoundException | NoSuchMethodException e) {
+ throw new RuntimeException("No sun.misc.Cleaner.clean() method found", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clean(ByteBuffer buf) {
+ try {
+ cleanMtd.invoke(cleanerMtd.invoke(buf));
+ }
+ catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException("Failed to invoke direct buffer cleaner", e);
+ }
+ }
+}
+
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/util/UnsafeDirectBufferCleaner.java
similarity index 50%
rename from modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageReader.java
rename to modules/core/src/main/java/org/apache/ignite/internal/util/UnsafeDirectBufferCleaner.java
index 4123be1..a340fa4 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/UnsafeDirectBufferCleaner.java
@@ -15,24 +15,34 @@
* limitations under the License.
*/
-package org.apache.ignite.network.scalecube;
+package org.apache.ignite.internal.util;
-import java.io.ObjectInputStream;
-import org.apache.ignite.network.internal.MessageReader;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import sun.misc.Unsafe;
-/** */
-@Deprecated
-public class ScaleCubeMessageReader implements MessageReader {
- /** */
- private final ObjectInputStream stream;
+/**
+ * {@link DirectBufferCleaner} implementation based on the {@code Unsafe.invokeCleaner} method.
+ *
+ * Note: This implementation will work only for Java 9+.
+ */
+public class UnsafeDirectBufferCleaner implements DirectBufferCleaner {
+ /** Cleaner method. */
+ private final Method cleanerMtd;
/** */
- public ScaleCubeMessageReader(ObjectInputStream stream) {
- this.stream = stream;
+ public UnsafeDirectBufferCleaner() {
+ try {
+ cleanerMtd = Unsafe.class.getMethod("invokeCleaner", ByteBuffer.class);
+ }
+ catch (NoSuchMethodException e) {
+ throw new RuntimeException("Reflection failure: no sun.misc.Unsafe.invokeCleaner() method found", e);
+ }
}
/** {@inheritDoc} */
- @Override public ObjectInputStream stream() {
- return this.stream;
+ @Override public void clean(ByteBuffer buf) {
+ GridUnsafe.invoke(cleanerMtd, buf);
}
}
+
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
new file mode 100644
index 0000000..63f791c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.extensions.communication;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Enum representing possible types of collection items.
+ */
+public enum MessageCollectionItemType {
+ /** Byte. */
+ BYTE,
+
+ /** Short. */
+ SHORT,
+
+ /** Integer. */
+ INT,
+
+ /** Long. */
+ LONG,
+
+ /** Float. */
+ FLOAT,
+
+ /** Double. */
+ DOUBLE,
+
+ /** Character. */
+ CHAR,
+
+ /** Boolean. */
+ BOOLEAN,
+
+ /** Byte array. */
+ BYTE_ARR,
+
+ /** Short array. */
+ SHORT_ARR,
+
+ /** Integer array. */
+ INT_ARR,
+
+ /** Long array. */
+ LONG_ARR,
+
+ /** Float array. */
+ FLOAT_ARR,
+
+ /** Double array. */
+ DOUBLE_ARR,
+
+ /** Character array. */
+ CHAR_ARR,
+
+ /** Boolean array. */
+ BOOLEAN_ARR,
+
+ /** String. */
+ STRING,
+
+ /** Bit set. */
+ BIT_SET,
+
+ /** UUID. */
+ UUID,
+
+ /** Ignite UUID. */
+ IGNITE_UUID,
+
+ /** Message. */
+ MSG;
+
+ /** Enum values. */
+ private static final MessageCollectionItemType[] VALS = values();
+
+ /**
+ * Efficiently gets enumerated value from its ordinal.
+ *
+ * @param ord Ordinal value.
+ * @return Enumerated value.
+ */
+ @Nullable
+ public static MessageCollectionItemType fromOrdinal(int ord) {
+ return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+ }
+}
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index 314a169..93547f1 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -42,6 +42,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- 3-rd party dependencies. -->
<dependency>
<groupId>io.scalecube</groupId>
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/DirectSerializationTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/DirectSerializationTest.java
new file mode 100644
index 0000000..80ddbd9
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/DirectSerializationTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.network.internal.MessageSerializerFactory;
+import org.apache.ignite.network.internal.direct.DirectMessageReader;
+import org.apache.ignite.network.internal.direct.DirectMessageWriter;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.MessageSerializerProvider;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.TestMessage;
+import org.apache.ignite.network.scalecube.TestMessageSerializerProvider;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test Direct Message Writing/Reading works. This test won't be needed after we implement Netty Transport
+ * for Ignite (IGNITE-14088).
+ */
+public class DirectSerializationTest {
+ /** */
+ @Test
+ public void test() {
+ MessageSerializerProvider[] messageMapperProviders = new MessageSerializerProvider[Short.MAX_VALUE << 1];
+
+ TestMessageSerializerProvider tProv = new TestMessageSerializerProvider();
+
+ messageMapperProviders[TestMessage.TYPE] = tProv;
+
+ MessageSerializerFactory factory = new MessageSerializerFactory(Arrays.asList(messageMapperProviders));
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 10_000; i++) {
+ sb.append("a");
+ }
+ String msgStr = sb.toString();
+
+ Map<Integer, String> someMap = new HashMap<>();
+
+ for (int i = 0; i < 26; i++) {
+ someMap.put(i, "" + (char) ('a' + i));
+ }
+
+ TestMessage message = new TestMessage(msgStr, someMap);
+ short directType = message.directType();
+
+ DirectMessageWriter writer = new DirectMessageWriter((byte) 1);
+ MessageSerializer<NetworkMessage> serializer = factory.createSerializer(directType);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
+
+ boolean writing = true;
+ while (writing && byteBuffer.capacity() < 20_000) {
+ writer.setBuffer(byteBuffer);
+ writing = !serializer.writeMessage(message, writer);
+
+ if (writing) {
+ byteBuffer.flip();
+ ByteBuffer tmp = ByteBuffer.allocateDirect(byteBuffer.capacity() + 4096);
+ tmp.put(byteBuffer);
+ byteBuffer = tmp;
+ }
+ }
+
+ assertFalse(writing);
+
+ byteBuffer.flip();
+
+ DirectMessageReader reader = new DirectMessageReader(factory, (byte) 1);
+ reader.setBuffer(byteBuffer);
+
+ byte type1 = byteBuffer.get();
+ byte type2 = byteBuffer.get();
+
+ short messageType = makeMessageType(type1, type2);
+
+ MessageDeserializer<NetworkMessage> deserializer = factory.createDeserializer(messageType);
+ boolean read = deserializer.readMessage(reader);
+
+ assertTrue(read);
+
+ TestMessage readMessage = (TestMessage) deserializer.getMessage();
+
+ assertEquals(message.msg(), readMessage.msg());
+ assertTrue(message.getMap().equals(readMessage.getMap()));
+ }
+
+ /**
+ * Concatenates the two parameter bytes to form a message type value.
+ *
+ * @param b0 The first byte.
+ * @param b1 The second byte.
+ */
+ public static short makeMessageType(byte b0, byte b1) {
+ return (short)((b1 & 0xFF) << 8 | b0 & 0xFF);
+ }
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
index 792bd6f..f6886d5 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java
@@ -28,8 +28,8 @@ import org.apache.ignite.network.NetworkCluster;
import org.apache.ignite.network.NetworkClusterEventHandler;
import org.apache.ignite.network.NetworkHandlersProvider;
import org.apache.ignite.network.NetworkMember;
-import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.message.NetworkMessage;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -88,7 +88,7 @@ class ITScaleCubeNetworkClusterMessagingTest {
waitForCluster(alice);
- TestMessage sentMessage = new TestMessage("Message from Alice");
+ TestMessage sentMessage = new TestMessage("Message from Alice", null);
//When: Send one message to all members in cluster.
for (NetworkMember member : alice.allMembers()) {
@@ -118,9 +118,9 @@ class ITScaleCubeNetworkClusterMessagingTest {
new ScaleCubeNetworkClusterFactory(name, port, addresses, new ScaleCubeMemberResolver())
);
- network.registerMessageMapper(TestMessage.TYPE, new TestMessageMapperProvider());
- network.registerMessageMapper(TestRequest.TYPE, new TestRequestMapperProvider());
- network.registerMessageMapper(TestResponse.TYPE, new TestResponseMapperProvider());
+ network.registerMessageMapper(TestMessage.TYPE, new TestMessageSerializerProvider());
+ network.registerMessageMapper(TestRequest.TYPE, new TestRequestSerializerProvider());
+ network.registerMessageMapper(TestResponse.TYPE, new TestResponseSerializerProvider());
NetworkCluster member = network.start();
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java
index d90ab12..342c214 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java
@@ -18,49 +18,57 @@
package org.apache.ignite.network.scalecube;
+import java.io.Serializable;
+import java.util.Map;
import java.util.Objects;
import org.apache.ignite.network.message.NetworkMessage;
/** */
-class TestMessage implements NetworkMessage {
+public class TestMessage implements NetworkMessage, Serializable {
/** Visible type for tests. */
public static final short TYPE = 3;
/** */
private final String msg;
+ private final Map<Integer, String> map;
+
/** */
- TestMessage(String msg) {
+ public TestMessage(String msg, Map<Integer, String> map) {
this.msg = msg;
+ this.map = map;
}
public String msg() {
return msg;
}
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- TestMessage message = (TestMessage)o;
- return Objects.equals(msg, message.msg);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return Objects.hash(msg);
+ public Map<Integer, String> getMap() {
+ return map;
}
/** {@inheritDoc} */
@Override public String toString() {
return "TestMessage{" +
"msg='" + msg + '\'' +
+ ", map=" + map +
'}';
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TestMessage message = (TestMessage) o;
+ return Objects.equals(msg, message.msg) && Objects.equals(map, message.map);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(msg, map);
+ }
+
+ /** {@inheritDoc} */
@Override public short directType() {
return TYPE;
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessageMapperProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessageMapperProvider.java
deleted file mode 100644
index f398aaf..0000000
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessageMapperProvider.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network.scalecube;
-
-import java.io.IOException;
-import org.apache.ignite.network.message.MessageDeserializer;
-import org.apache.ignite.network.message.MessageMapperProvider;
-import org.apache.ignite.network.message.MessageMappingException;
-import org.apache.ignite.network.message.MessageSerializer;
-
-/**
- * Mapper for {@link TestMessage}.
- */
-public class TestMessageMapperProvider implements MessageMapperProvider<TestMessage> {
- /** {@inheritDoc} */
- @Override public MessageDeserializer<TestMessage> createDeserializer() {
- return reader -> {
- try {
- final String str = (String) reader.stream().readObject();
- return new TestMessage(str);
- }
- catch (IOException | ClassNotFoundException e) {
- throw new MessageMappingException("Failed to deserialize", e);
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public MessageSerializer<TestMessage> createSerializer() {
- return (message, writer) -> {
- try {
- writer.stream().writeObject(message.msg());
- }
- catch (IOException e) {
- throw new MessageMappingException("Failed to serialize", e);
- }
- };
- }
-}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessageSerializerProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessageSerializerProvider.java
new file mode 100644
index 0000000..c37458c
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessageSerializerProvider.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import java.util.Map;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageMappingException;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.MessageSerializerProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+
+/**
+ * Mapper for {@link TestMessage}.
+ */
+public class TestMessageSerializerProvider implements MessageSerializerProvider<TestMessage> {
+ /** {@inheritDoc} */
+ @Override public MessageDeserializer<TestMessage> createDeserializer() {
+ return new MessageDeserializer<TestMessage>() {
+
+ private TestMessage obj;
+
+ private String msg;
+ private Map<Integer, String> map;
+
+ @Override
+ public boolean readMessage(MessageReader reader) throws MessageMappingException {
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ map = reader.readMap("map", MessageCollectionItemType.INT, MessageCollectionItemType.STRING, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ //noinspection fallthrough
+ case 1:
+ msg = reader.readString("msg");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ obj = new TestMessage(msg, map);
+
+ return reader.afterMessageRead(TestMessage.class);
+ }
+
+ @Override
+ public Class<TestMessage> klass() {
+ return TestMessage.class;
+ }
+
+ @Override
+ public TestMessage getMessage() {
+ return obj;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessageSerializer<TestMessage> createSerializer() {
+ return (message, writer) -> {
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(message.directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeMap("map", message.getMap(), MessageCollectionItemType.INT, MessageCollectionItemType.STRING))
+ return false;
+
+ writer.incrementState();
+
+ //noinspection fallthrough
+ case 1:
+ if (!writer.writeString("msg", message.msg()))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequestMapperProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequestSerializerProvider.java
similarity index 59%
rename from modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequestMapperProvider.java
rename to modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequestSerializerProvider.java
index 7ea440c..36585af 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequestMapperProvider.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequestSerializerProvider.java
@@ -17,38 +17,26 @@
package org.apache.ignite.network.scalecube;
-import java.io.IOException;
import org.apache.ignite.network.message.MessageDeserializer;
-import org.apache.ignite.network.message.MessageMapperProvider;
-import org.apache.ignite.network.message.MessageMappingException;
import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.MessageSerializerProvider;
/**
* Mapper for {@link TestRequest}.
*/
-public class TestRequestMapperProvider implements MessageMapperProvider<TestRequest> {
+public class TestRequestSerializerProvider implements MessageSerializerProvider<TestRequest> {
/** {@inheritDoc} */
@Override public MessageDeserializer<TestRequest> createDeserializer() {
- return reader -> {
- try {
- final int number = reader.stream().readInt();
- return new TestRequest(number);
- }
- catch (IOException e) {
- throw new MessageMappingException("Failed to deserialize", e);
- }
- };
+ return null;
}
/** {@inheritDoc} */
@Override public MessageSerializer<TestRequest> createSerializer() {
- return (message, writer) -> {
- try {
- writer.stream().writeInt(message.number());
- }
- catch (IOException e) {
- throw new MessageMappingException("Failed to serialize", e);
- }
- };
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 0;
}
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponseMapperProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponseSerializerProvider.java
similarity index 59%
rename from modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponseMapperProvider.java
rename to modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponseSerializerProvider.java
index 054ac73..7eb8626 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponseMapperProvider.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponseSerializerProvider.java
@@ -17,38 +17,26 @@
package org.apache.ignite.network.scalecube;
-import java.io.IOException;
import org.apache.ignite.network.message.MessageDeserializer;
-import org.apache.ignite.network.message.MessageMapperProvider;
-import org.apache.ignite.network.message.MessageMappingException;
import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.MessageSerializerProvider;
/**
* Mapper provider for {@link TestResponse}.
*/
-public class TestResponseMapperProvider implements MessageMapperProvider<TestResponse> {
+public class TestResponseSerializerProvider implements MessageSerializerProvider<TestResponse> {
/** {@inheritDoc} */
@Override public MessageDeserializer<TestResponse> createDeserializer() {
- return reader -> {
- try {
- final int number = reader.stream().readInt();
- return new TestResponse(number);
- }
- catch (IOException e) {
- throw new MessageMappingException("Failed to deserialize", e);
- }
- };
+ return null;
}
/** {@inheritDoc} */
@Override public MessageSerializer<TestResponse> createSerializer() {
- return (message, writer) -> {
- try {
- writer.stream().writeInt(message.responseNumber());
- }
- catch (IOException e) {
- throw new MessageMappingException("Failed to serialize", e);
- }
- };
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 0;
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/Network.java b/modules/network/src/main/java/org/apache/ignite/network/Network.java
index 660c36e..6d465da 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/Network.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/Network.java
@@ -19,14 +19,14 @@ package org.apache.ignite.network;
import java.util.Arrays;
import java.util.Collections;
-import org.apache.ignite.network.message.MessageMapperProvider;
+import org.apache.ignite.network.message.MessageSerializerProvider;
/**
* Entry point for network module.
*/
public class Network {
/** Message mapper providers, messageMapperProviders[message type] -> message mapper provider for message with message type. */
- private final MessageMapperProvider<?>[] messageMapperProviders = new MessageMapperProvider<?>[Short.MAX_VALUE << 1];
+ private final MessageSerializerProvider<?>[] messageSerializerProviders = new MessageSerializerProvider<?>[Short.MAX_VALUE << 1];
/** Message handlers. */
private final MessageHandlerHolder messageHandlerHolder = new MessageHandlerHolder();
@@ -47,11 +47,11 @@ public class Network {
* @param type Message type.
* @param mapperProvider Message mapper provider.
*/
- public void registerMessageMapper(short type, MessageMapperProvider mapperProvider) throws NetworkConfigurationException {
- if (this.messageMapperProviders[type] != null)
+ public void registerMessageMapper(short type, MessageSerializerProvider mapperProvider) throws NetworkConfigurationException {
+ if (this.messageSerializerProviders[type] != null)
throw new NetworkConfigurationException("Message mapper for type " + type + " is already defined");
- this.messageMapperProviders[type] = mapperProvider;
+ this.messageSerializerProviders[type] = mapperProvider;
}
/**
@@ -60,7 +60,7 @@ public class Network {
*/
public NetworkCluster start() {
//noinspection Java9CollectionFactory
- NetworkClusterContext context = new NetworkClusterContext(messageHandlerHolder, Collections.unmodifiableList(Arrays.asList(messageMapperProviders)));
+ NetworkClusterContext context = new NetworkClusterContext(messageHandlerHolder, Collections.unmodifiableList(Arrays.asList(messageSerializerProviders)));
return clusterFactory.startCluster(context);
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterContext.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterContext.java
index fcdf591..5d3ae4a 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterContext.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterContext.java
@@ -19,7 +19,7 @@ package org.apache.ignite.network;
import java.util.Collections;
import java.util.List;
-import org.apache.ignite.network.message.MessageMapperProvider;
+import org.apache.ignite.network.message.MessageSerializerProvider;
/**
* Cluster context.
@@ -29,16 +29,16 @@ public class NetworkClusterContext {
private final MessageHandlerHolder messageHandlerHolder;
/** Message mappers, messageMapperProviders[message type] -> message mapper provider for message with message type. */
- private final List<MessageMapperProvider<?>> messageMapperProviders;
+ private final List<MessageSerializerProvider<?>> messageSerializerProviders;
/**
* Constructor.
* @param messageHandlerHolder Message handlers.
- * @param messageMapperProviders Message mappers map.
+ * @param messageSerializerProviders Message mappers map.
*/
- public NetworkClusterContext(MessageHandlerHolder messageHandlerHolder, List<MessageMapperProvider<?>> messageMapperProviders) {
+ public NetworkClusterContext(MessageHandlerHolder messageHandlerHolder, List<MessageSerializerProvider<?>> messageSerializerProviders) {
this.messageHandlerHolder = messageHandlerHolder;
- this.messageMapperProviders = messageMapperProviders;
+ this.messageSerializerProviders = messageSerializerProviders;
}
/**
@@ -51,7 +51,7 @@ public class NetworkClusterContext {
/**
* @return Message mapper providers list.
*/
- public List<MessageMapperProvider<?>> messageMapperProviders() {
- return Collections.unmodifiableList(messageMapperProviders);
+ public List<MessageSerializerProvider<?>> messageMapperProviders() {
+ return Collections.unmodifiableList(messageSerializerProviders);
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/MessageReader.java b/modules/network/src/main/java/org/apache/ignite/network/internal/MessageReader.java
index b46aea4..7ee564a 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/MessageReader.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/MessageReader.java
@@ -17,13 +17,292 @@
package org.apache.ignite.network.internal;
-import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
/**
* Stateful message reader.
*/
public interface MessageReader {
- /** Get input stream. Will be replaced with Ignite 2.X message reader interface. */
- @Deprecated
- ObjectInputStream stream();
+ /**
+ * Sets the byte buffer to read from.
+ *
+ * @param buf Byte buffer.
+ */
+ public void setBuffer(ByteBuffer buf);
+
+ /**
+ * Sets the type of the message that is currently being read.
+ *
+ * @param msgCls Message type.
+ */
+ public void setCurrentReadClass(Class<? extends NetworkMessage> msgCls);
+
+ /**
+ * Callback that must be invoked by implementations of message serializers before they start decoding the message body.
+ *
+ * @return {@code True} if a read operation is allowed to proceed, {@code false} otherwise.
+ */
+ public boolean beforeMessageRead();
+
+ /**
+ * Callback that must be invoked by implementations of message serializers after they finished decoding the message body.
+ *
+ * @param msgCls Class of the message that is finishing read stage.
+ * @return {@code True} if a read operation can be proceeded, {@code false} otherwise.
+ */
+ public boolean afterMessageRead(Class<? extends NetworkMessage> msgCls);
+
+ /**
+ * Reads a {@code byte} value.
+ *
+ * @param name Field name.
+ * @return {@code byte} value.
+ */
+ public byte readByte(String name);
+
+ /**
+ * Reads a {@code short} value.
+ *
+ * @param name Field name.
+ * @return {@code short} value.
+ */
+ public short readShort(String name);
+
+ /**
+ * Reads an {@code int} value.
+ *
+ * @param name Field name.
+ * @return {@code int} value.
+ */
+ public int readInt(String name);
+
+ /**
+ * Reads an {@code int} value.
+ *
+ * @param name Field name.
+ * @param dflt A default value if the field is not found.
+ * @return {@code int} value.
+ */
+ public int readInt(String name, int dflt);
+
+ /**
+ * Reads a {@code long} value.
+ *
+ * @param name Field name.
+ * @return {@code long} value.
+ */
+ public long readLong(String name);
+
+ /**
+ * Reads a {@code float} value.
+ *
+ * @param name Field name.
+ * @return {@code float} value.
+ */
+ public float readFloat(String name);
+
+ /**
+ * Reads a {@code double} value.
+ *
+ * @param name Field name.
+ * @return {@code double} value.
+ */
+ public double readDouble(String name);
+
+ /**
+ * Reads a {@code char} value.
+ *
+ * @param name Field name.
+ * @return {@code char} value.
+ */
+ public char readChar(String name);
+
+ /**
+ * Reads a {@code boolean} value.
+ *
+ * @param name Field name.
+ * @return {@code boolean} value.
+ */
+ public boolean readBoolean(String name);
+
+ /**
+ * Reads a {@code byte} array.
+ *
+ * @param name Field name.
+ * @return {@code byte} array.
+ */
+ public byte[] readByteArray(String name);
+
+ /**
+ * Reads a {@code short} array.
+ *
+ * @param name Field name.
+ * @return {@code short} array.
+ */
+ public short[] readShortArray(String name);
+
+ /**
+ * Reads an {@code int} array.
+ *
+ * @param name Field name.
+ * @return {@code int} array.
+ */
+ public int[] readIntArray(String name);
+
+ /**
+ * Reads a {@code long} array.
+ *
+ * @param name Field name.
+ * @return {@code long} array.
+ */
+ public long[] readLongArray(String name);
+
+ /**
+ * Reads a {@code float} array.
+ *
+ * @param name Field name.
+ * @return {@code float} array.
+ */
+ public float[] readFloatArray(String name);
+
+ /**
+ * Reads a {@code double} array.
+ *
+ * @param name Field name.
+ * @return {@code double} array.
+ */
+ public double[] readDoubleArray(String name);
+
+ /**
+ * Reads a {@code char} array.
+ *
+ * @param name Field name.
+ * @return {@code char} array.
+ */
+ public char[] readCharArray(String name);
+
+ /**
+ * Reads a {@code boolean} array.
+ *
+ * @param name Field name.
+ * @return {@code boolean} array.
+ */
+ public boolean[] readBooleanArray(String name);
+
+ /**
+ * Reads a {@link String}.
+ *
+ * @param name Field name.
+ * @return {@link String}.
+ */
+ public String readString(String name);
+
+ /**
+ * Reads a {@link BitSet}.
+ *
+ * @param name Field name.
+ * @return {@link BitSet}.
+ */
+ public BitSet readBitSet(String name);
+
+ /**
+ * Reads an {@link UUID}.
+ *
+ * @param name Field name.
+ * @return {@link UUID}.
+ */
+ public UUID readUuid(String name);
+
+ /**
+ * Reads an {@link IgniteUuid}.
+ *
+ * @param name Field name.
+ * @return {@link IgniteUuid}.
+ */
+ public IgniteUuid readIgniteUuid(String name);
+
+ /**
+ * Reads a nested message.
+ *
+ * @param name Field name.
+ * @return Message.
+ */
+ public <T extends NetworkMessage> T readMessage(String name);
+
+ /**
+ * Reads an array of objects.
+ *
+ * @param name Field name.
+ * @param itemType A component type of the array.
+ * @param itemCls A component class of the array.
+ * @return Array of objects.
+ */
+ public <T> T[] readObjectArray(String name, MessageCollectionItemType itemType, Class<T> itemCls);
+
+ /**
+ * Reads a collection.
+ *
+ * @param name Field name.
+ * @param itemType An item type of the Collection.
+ * @return Collection.
+ */
+ public <C extends Collection<?>> C readCollection(String name, MessageCollectionItemType itemType);
+
+ /**
+ * Reads a map.
+ *
+ * @param name Field name.
+ * @param keyType The type of the map's key.
+ * @param valType The type of the map's value.
+ * @param linked Whether a {@link LinkedHashMap} should be created.
+ * @return Map.
+ */
+ public <M extends Map<?, ?>> M readMap(String name, MessageCollectionItemType keyType,
+ MessageCollectionItemType valType, boolean linked);
+
+ /**
+ * Tells whether the last invocation of any of the {@code readXXX(...)}
+ * methods has fully written the value. {@code False} is returned
+ * if there were not enough remaining bytes in a byte buffer.
+ *
+ * @return Whether the last value was fully read.
+ */
+ public boolean isLastRead();
+
+ /**
+ * Gets a current read state.
+ *
+ * @return Read state.
+ */
+ public int state();
+
+ /**
+ * Increments a read state.
+ */
+ public void incrementState();
+
+ /**
+ * Callback called before an inner message is read.
+ */
+ public void beforeInnerMessageRead();
+
+ /**
+ * Callback called after an inner message is read.
+ *
+ * @param finished Whether a message was fully read.
+ */
+ public void afterInnerMessageRead(boolean finished);
+
+ /**
+ * Resets this reader.
+ */
+ public void reset();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/MessageSerializerFactory.java b/modules/network/src/main/java/org/apache/ignite/network/internal/MessageSerializerFactory.java
new file mode 100644
index 0000000..d5d8942
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/MessageSerializerFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal;
+
+import java.util.List;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.MessageSerializerProvider;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Factory that provides message serializers and deserializers by {@link NetworkMessage#directType()}.
+ */
+public class MessageSerializerFactory {
+ /** List of all serializers. Index is the direct type of the message. */
+ private final List<MessageSerializerProvider<NetworkMessage>> serializerProviders;
+
+ /** Constructor. */
+ public MessageSerializerFactory(List<MessageSerializerProvider<NetworkMessage>> mappers) {
+ serializerProviders = mappers;
+ }
+
+ /**
+ * Creates a deserializer for a message of the given direct type.
+ * @param directType Message's direct type.
+ * @return Message deserializer.
+ */
+ public MessageDeserializer<NetworkMessage> createDeserializer(short directType) {
+ return serializerProviders.get(directType).createDeserializer();
+ }
+
+ /**
+ * Creates a serializer for a message of the given direct type.
+ * @param directType Message's direct type.
+ * @return Message serializer.
+ */
+ public MessageSerializer<NetworkMessage> createSerializer(short directType) {
+ return serializerProviders.get(directType).createSerializer();
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/MessageWriter.java b/modules/network/src/main/java/org/apache/ignite/network/internal/MessageWriter.java
index 6546647..bad436f 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/MessageWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/MessageWriter.java
@@ -17,13 +17,320 @@
package org.apache.ignite.network.internal;
-import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
/**
* Stateful message writer.
*/
public interface MessageWriter {
- /** Get output stream. Will be replaced with Ignite 2.X message writer interface. */
- @Deprecated
- ObjectOutputStream stream();
+ /**
+ * Sets the byte buffer to write to.
+ *
+ * @param buf Byte buffer.
+ */
+ public void setBuffer(ByteBuffer buf);
+
+ /**
+ * Sets the type of the message that is currently being written.
+ *
+ * @param msgCls Message type.
+ */
+ public void setCurrentWriteClass(Class<? extends NetworkMessage> msgCls);
+
+ /**
+ * Writes the header of a message.
+ *
+ * @param type Message type.
+ * @param fieldCnt Fields count.
+ * @return {@code true} if successfully. Otherwise returns {@code false}.
+ */
+ public boolean writeHeader(short type, byte fieldCnt);
+
+ /**
+ * Writes a {@code byte} value.
+ *
+ * @param name Field name.
+ * @param val {@code byte} value.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeByte(String name, byte val);
+
+ /**
+ * Writes a {@code short} value.
+ *
+ * @param name Field name.
+ * @param val {@code short} value.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeShort(String name, short val);
+
+ /**
+ * Writes an {@code int} value.
+ *
+ * @param name Field name.
+ * @param val {@code int} value.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeInt(String name, int val);
+
+ /**
+ * Writes a {@code long} value.
+ *
+ * @param name Field name.
+ * @param val {@code long} value.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeLong(String name, long val);
+
+ /**
+ * Writes a {@code float} value.
+ *
+ * @param name Field name.
+ * @param val {@code float} value.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeFloat(String name, float val);
+
+ /**
+ * Writes a {@code double} value.
+ *
+ * @param name Field name.
+ * @param val {@code double} value.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeDouble(String name, double val);
+
+ /**
+ * Writes a {@code char} value.
+ *
+ * @param name Field name.
+ * @param val {@code char} value.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeChar(String name, char val);
+
+ /**
+ * Writes a {@code boolean} value.
+ *
+ * @param name Field name.
+ * @param val {@code boolean} value.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeBoolean(String name, boolean val);
+
+ /**
+ * Writes a {@code byte} array.
+ *
+ * @param name Field name.
+ * @param val {@code byte} array.
+ * @return Whether an array was fully written.
+ */
+ public boolean writeByteArray(String name, byte[] val);
+
+ /**
+ * Writes a {@code byte} array.
+ *
+ * @param name Field name.
+ * @param val {@code byte} array.
+ * @param off Offset.
+ * @param len Length.
+ * @return Whether an array was fully written.
+ */
+ public boolean writeByteArray(String name, byte[] val, long off, int len);
+
+ /**
+ * Writes a {@code short} array.
+ *
+ * @param name Field name.
+ * @param val {@code short} array.
+ * @return Whether an array was fully written.
+ */
+ public boolean writeShortArray(String name, short[] val);
+
+ /**
+ * Writes an {@code int} array.
+ *
+ * @param name Field name.
+ * @param val {@code int} array.
+ * @return Whether an array was fully written.
+ */
+ public boolean writeIntArray(String name, int[] val);
+
+ /**
+ * Writes a {@code long} array.
+ *
+ * @param name Field name.
+ * @param val {@code long} array.
+ * @return Whether an array was fully written.
+ */
+ public boolean writeLongArray(String name, long[] val);
+
+ /**
+ * Writes a {@code long} array.
+ *
+ * @param name Field name.
+ * @param val {@code long} array.
+ * @param len Length.
+ * @return Whether an array was fully written.
+ */
+ public boolean writeLongArray(String name, long[] val, int len);
+
+ /**
+ * Writes a {@code float} array.
+ *
+ * @param name Field name.
+ * @param val {@code float} array.
+ * @return Whether an array was fully written.
+ */
+ public boolean writeFloatArray(String name, float[] val);
+
+ /**
+ * Writes a {@code double} array.
+ *
+ * @param name Field name.
+ * @param val {@code double} array.
+ * @return Whether an array was fully written.
+ */
+ public boolean writeDoubleArray(String name, double[] val);
+
+ /**
+ * Writes a {@code char} array.
+ *
+ * @param name Field name.
+ * @param val {@code char} array.
+ * @return Whether an array was fully written.
+ */
+ public boolean writeCharArray(String name, char[] val);
+
+ /**
+ * Writes a {@code boolean} array.
+ *
+ * @param name Field name.
+ * @param val {@code boolean} array.
+ * @return Whether an array was fully written.
+ */
+ public boolean writeBooleanArray(String name, boolean[] val);
+
+ /**
+ * Writes a {@link String}.
+ *
+ * @param name Field name.
+ * @param val {@link String}.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeString(String name, String val);
+
+ /**
+ * Writes a {@link BitSet}.
+ *
+ * @param name Field name.
+ * @param val {@link BitSet}.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeBitSet(String name, BitSet val);
+
+ /**
+ * Writes an {@link UUID}.
+ *
+ * @param name Field name.
+ * @param val {@link UUID}.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeUuid(String name, UUID val);
+
+ /**
+ * Writes an {@link IgniteUuid}.
+ *
+ * @param name Field name.
+ * @param val {@link IgniteUuid}.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeIgniteUuid(String name, IgniteUuid val);
+
+ /**
+ * Writes a nested message.
+ *
+ * @param name Field name.
+ * @param val Message.
+ * @return Whether a value was fully written.
+ */
+ public boolean writeMessage(String name, NetworkMessage val);
+
+ /**
+ * Writes an array of objects.
+ *
+ * @param name Field name.
+ * @param arr Array of objects.
+ * @param itemType A component type of the array.
+ * @return Whether an array was fully written.
+ */
+ public <T> boolean writeObjectArray(String name, T[] arr, MessageCollectionItemType itemType);
+
+ /**
+ * Writes collection.
+ *
+ * @param name Field name.
+ * @param col Collection.
+ * @param itemType An item type of the collection.
+ * @return Whether a value was fully written.
+ */
+ public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType);
+
+ /**
+ * Writes a map.
+ *
+ * @param name Field name.
+ * @param map Map.
+ * @param keyType The type of the map's key.
+ * @param valType The type of the map's value.
+ * @return Whether a value was fully written.
+ */
+ public <K, V> boolean writeMap(String name, Map<K, V> map, MessageCollectionItemType keyType,
+ MessageCollectionItemType valType);
+
+ /**
+ * @return Whether the header of the current message is already written.
+ */
+ public boolean isHeaderWritten();
+
+ /**
+ * Callback called when the header of the message is written.
+ */
+ public void onHeaderWritten();
+
+ /**
+ * Gets a current message state.
+ *
+ * @return State.
+ */
+ public int state();
+
+ /**
+ * Increments a state.
+ */
+ public void incrementState();
+
+ /**
+ * Callback called before an inner message is written.
+ */
+ public void beforeInnerMessageWrite();
+
+ /**
+ * Callback called after an inner message is written.
+ *
+ * @param finished Whether the message was fully written.
+ */
+ public void afterInnerMessageWrite(boolean finished);
+
+ /**
+ * Resets this writer.
+ */
+ public void reset();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageReader.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageReader.java
new file mode 100644
index 0000000..2d4ebbc
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageReader.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.direct;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.internal.MessageSerializerFactory;
+import org.apache.ignite.network.internal.direct.state.DirectMessageState;
+import org.apache.ignite.network.internal.direct.state.DirectMessageStateItem;
+import org.apache.ignite.network.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.network.internal.direct.stream.DirectByteBufferStreamImplV1;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Message reader implementation.
+ */
+public class DirectMessageReader implements MessageReader {
+ /** State. */
+ private final DirectMessageState<StateItem> state;
+
+ /** Whether last field was fully read. */
+ private boolean lastRead;
+
+ /**
+ * @param msgFactory Message factory.
+ * @param protoVer Protocol version.
+ */
+ public DirectMessageReader(final MessageSerializerFactory msgFactory, final byte protoVer) {
+ state = new DirectMessageState<>(StateItem.class, new Supplier<StateItem>() {
+ @Override public StateItem get() {
+ return new StateItem(msgFactory, protoVer);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setBuffer(ByteBuffer buf) {
+ state.item().stream.setBuffer(buf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setCurrentReadClass(Class<? extends NetworkMessage> msgCls) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean beforeMessageRead() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean afterMessageRead(Class<? extends NetworkMessage> msgCls) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte readByte(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ byte val = stream.readByte();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short readShort(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ short val = stream.readShort();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ int val = stream.readInt();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt(String name, int dflt) {
+ return readInt(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLong(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ long val = stream.readLong();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float readFloat(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ float val = stream.readFloat();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public double readDouble(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ double val = stream.readDouble();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char readChar(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ char val = stream.readChar();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readBoolean(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ boolean val = stream.readBoolean();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public byte[] readByteArray(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ byte[] arr = stream.readByteArray();
+
+ lastRead = stream.lastFinished();
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public short[] readShortArray(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ short[] arr = stream.readShortArray();
+
+ lastRead = stream.lastFinished();
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public int[] readIntArray(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ int[] arr = stream.readIntArray();
+
+ lastRead = stream.lastFinished();
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public long[] readLongArray(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ long[] arr = stream.readLongArray();
+
+ lastRead = stream.lastFinished();
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public float[] readFloatArray(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ float[] arr = stream.readFloatArray();
+
+ lastRead = stream.lastFinished();
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public double[] readDoubleArray(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ double[] arr = stream.readDoubleArray();
+
+ lastRead = stream.lastFinished();
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public char[] readCharArray(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ char[] arr = stream.readCharArray();
+
+ lastRead = stream.lastFinished();
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public boolean[] readBooleanArray(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ boolean[] arr = stream.readBooleanArray();
+
+ lastRead = stream.lastFinished();
+
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String readString(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ String val = stream.readString();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public BitSet readBitSet(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ BitSet val = stream.readBitSet();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID readUuid(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ UUID val = stream.readUuid();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid readIgniteUuid(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ IgniteUuid val = stream.readIgniteUuid();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <T extends NetworkMessage> T readMessage(String name) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ T msg = stream.readMessage(this);
+
+ lastRead = stream.lastFinished();
+
+ return msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T[] readObjectArray(String name, MessageCollectionItemType itemType, Class<T> itemCls) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ T[] msg = stream.readObjectArray(itemType, itemCls, this);
+
+ lastRead = stream.lastFinished();
+
+ return msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <C extends Collection<?>> C readCollection(String name, MessageCollectionItemType itemType) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ C col = stream.readCollection(itemType, this);
+
+ lastRead = stream.lastFinished();
+
+ return col;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <M extends Map<?, ?>> M readMap(String name, MessageCollectionItemType keyType,
+ MessageCollectionItemType valType, boolean linked) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ M map = stream.readMap(keyType, valType, linked, this);
+
+ lastRead = stream.lastFinished();
+
+ return map;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLastRead() {
+ return lastRead;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int state() {
+ return state.item().state;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrementState() {
+ state.item().state++;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeInnerMessageRead() {
+ state.forward();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void afterInnerMessageRead(boolean finished) {
+ state.backward(finished);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ state.reset();
+ }
+
+ /**
+ */
+ private static class StateItem implements DirectMessageStateItem {
+ /** Stream. */
+ private final DirectByteBufferStream stream;
+
+ /** State. */
+ private int state;
+
+ /**
+ * @param msgFactory Message factory.
+ * @param protoVer Protocol version.
+ */
+ StateItem(MessageSerializerFactory msgFactory, byte protoVer) {
+ switch (protoVer) {
+ case 1:
+ stream = new DirectByteBufferStreamImplV1(msgFactory);
+
+ break;
+
+ default:
+ throw new IllegalStateException("Invalid protocol version: " + protoVer);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ state = 0;
+ }
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java
new file mode 100644
index 0000000..1fbe6e4
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectMessageWriter.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.direct;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.network.internal.MessageWriter;
+import org.apache.ignite.network.internal.direct.state.DirectMessageState;
+import org.apache.ignite.network.internal.direct.state.DirectMessageStateItem;
+import org.apache.ignite.network.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.network.internal.direct.stream.DirectByteBufferStreamImplV1;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.jetbrains.annotations.Nullable;
+
+public class DirectMessageWriter implements MessageWriter {
+ /** State. */
+ private final DirectMessageState<StateItem> state;
+
+ /**
+ * @param protoVer Protocol version.
+ */
+ public DirectMessageWriter(final byte protoVer) {
+ state = new DirectMessageState<>(StateItem.class, new Supplier<StateItem>() {
+ @Override public StateItem get() {
+ return new StateItem(protoVer);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setBuffer(ByteBuffer buf) {
+ state.item().stream.setBuffer(buf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setCurrentWriteClass(Class<? extends NetworkMessage> msgCls) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeHeader(short type, byte fieldCnt) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeShort(type);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeByte(String name, byte val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeByte(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeShort(String name, short val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeShort(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeInt(String name, int val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeInt(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeLong(String name, long val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeLong(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeFloat(String name, float val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeFloat(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeDouble(String name, double val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeDouble(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeChar(String name, char val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeChar(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeBoolean(String name, boolean val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeBoolean(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeByteArray(String name, @Nullable byte[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeByteArray(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeByteArray(String name, byte[] val, long off, int len) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeByteArray(val, off, len);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeShortArray(String name, @Nullable short[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeShortArray(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeIntArray(String name, @Nullable int[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeIntArray(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeLongArray(String name, @Nullable long[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeLongArray(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeLongArray(String name, long[] val, int len) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeLongArray(val, len);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeFloatArray(String name, @Nullable float[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeFloatArray(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeDoubleArray(String name, @Nullable double[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeDoubleArray(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeCharArray(String name, @Nullable char[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeCharArray(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeBooleanArray(String name, @Nullable boolean[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeBooleanArray(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeString(String name, String val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeString(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeBitSet(String name, BitSet val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeBitSet(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeUuid(String name, UUID val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeUuid(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeIgniteUuid(String name, IgniteUuid val) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeIgniteUuid(val);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeMessage(String name, @Nullable NetworkMessage msg) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeMessage(msg, this);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> boolean writeObjectArray(String name, T[] arr, MessageCollectionItemType itemType) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeObjectArray(arr, itemType, this);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeCollection(col, itemType, this);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> boolean writeMap(String name, Map<K, V> map, MessageCollectionItemType keyType,
+ MessageCollectionItemType valType) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeMap(map, keyType, valType, this);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isHeaderWritten() {
+ return state.item().hdrWritten;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onHeaderWritten() {
+ state.item().hdrWritten = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int state() {
+ return state.item().state;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void incrementState() {
+ state.item().state++;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeInnerMessageWrite() {
+ state.forward();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void afterInnerMessageWrite(boolean finished) {
+ state.backward(finished);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ state.reset();
+ }
+
+ /**
+ */
+ private static class StateItem implements DirectMessageStateItem {
+ /** */
+ private final DirectByteBufferStream stream;
+
+ /** */
+ private int state;
+
+ /** */
+ private boolean hdrWritten;
+
+ /**
+ * @param protoVer Protocol version.
+ */
+ StateItem(byte protoVer) {
+ switch (protoVer) {
+ case 1:
+ stream = new DirectByteBufferStreamImplV1(null);
+
+ break;
+
+ default:
+ throw new IllegalStateException("Invalid protocol version: " + protoVer);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ state = 0;
+ hdrWritten = false;
+ }
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/state/DirectMessageState.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/state/DirectMessageState.java
new file mode 100644
index 0000000..2e7e5ac
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/state/DirectMessageState.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.direct.state;
+
+import java.lang.reflect.Array;
+import java.util.function.Supplier;
+
+/**
+ * Message state.
+ */
+public class DirectMessageState<T extends DirectMessageStateItem> {
+ /** Initial array size. */
+ private static final int INIT_SIZE = 10;
+
+ /** Item factory. */
+ private final Supplier<T> factory;
+
+ /** Stack array. */
+ private T[] stack;
+
+ /** Current position. */
+ private int pos;
+
+ /**
+ * @param cls State item type.
+ * @param factory Item factory.
+ */
+ public DirectMessageState(Class<T> cls, Supplier<T> factory) {
+ this.factory = factory;
+
+ stack = (T[]) Array.newInstance(cls, INIT_SIZE);
+
+ stack[0] = factory.get();
+ }
+
+ /**
+ * @return Current item.
+ */
+ public T item() {
+ return stack[pos];
+ }
+
+ /**
+ * Go forward.
+ */
+ public void forward() {
+ pos++;
+
+ if (pos == stack.length) {
+ T[] stack0 = stack;
+
+ stack = (T[])Array.newInstance(stack.getClass().getComponentType(), stack.length << 1);
+
+ System.arraycopy(stack0, 0, stack, 0, stack0.length);
+ }
+
+ if (stack[pos] == null)
+ stack[pos] = factory.get();
+ }
+
+ /**
+ * Go backward.
+ *
+ * @param reset Whether to reset current item.
+ */
+ public void backward(boolean reset) {
+ if (reset)
+ stack[pos].reset();
+
+ pos--;
+ }
+
+ /**
+ * Resets state.
+ */
+ public void reset() {
+ assert pos == 0;
+
+ stack[0].reset();
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/message/NetworkMessage.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/state/DirectMessageStateItem.java
similarity index 81%
copy from modules/network/src/main/java/org/apache/ignite/network/message/NetworkMessage.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/direct/state/DirectMessageStateItem.java
index 50b4975..9ac1cd9 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/message/NetworkMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/state/DirectMessageStateItem.java
@@ -14,14 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.network.message;
+
+package org.apache.ignite.network.internal.direct.state;
/**
- * Message for exchange information in cluster.
+ * Message state item.
*/
-public interface NetworkMessage {
+public interface DirectMessageStateItem {
/**
- * @return Message type.
+ * Resets the state.
*/
- public abstract short directType();
+ public void reset();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStream.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStream.java
new file mode 100644
index 0000000..a0596dc
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStream.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.direct.stream;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.internal.MessageWriter;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+
+public interface DirectByteBufferStream {
+ /**
+ * @param buf Buffer.
+ */
+ public void setBuffer(ByteBuffer buf);
+
+ /**
+ * @return Number of remaining bytes.
+ */
+ public int remaining();
+
+ /**
+ * @return Whether last object was fully written or read.
+ */
+ public boolean lastFinished();
+
+ /**
+ * @param val Value.
+ */
+ public void writeByte(byte val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeShort(short val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeInt(int val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeLong(long val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeFloat(float val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeDouble(double val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeChar(char val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeBoolean(boolean val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeByteArray(byte[] val);
+
+ /**
+ * @param val Value.
+ * @param off Offset.
+ * @param len Length.
+ */
+ public void writeByteArray(byte[] val, long off, int len);
+
+ /**
+ * @param val Value.
+ */
+ public void writeShortArray(short[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeIntArray(int[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeLongArray(long[] val);
+
+ /**
+ * @param val Value.
+ * @param len Length.
+ */
+ public void writeLongArray(long[] val, int len);
+
+ /**
+ * @param val Value.
+ */
+ public void writeFloatArray(float[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeDoubleArray(double[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeCharArray(char[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeBooleanArray(boolean[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeString(String val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeBitSet(BitSet val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeUuid(UUID val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeIgniteUuid(IgniteUuid val);
+
+ /**
+ * @param msg Message.
+ * @param writer Writer.
+ */
+ public void writeMessage(NetworkMessage msg, MessageWriter writer);
+
+ /**
+ * @param arr Array.
+ * @param itemType Component type.
+ * @param writer Writer.
+ */
+ public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer);
+
+ /**
+ * @param col Collection.
+ * @param itemType Component type.
+ * @param writer Writer.
+ */
+ public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, MessageWriter writer);
+
+ /**
+ * @param map Map.
+ * @param keyType Key type.
+ * @param valType Value type.
+ * @param writer Writer.
+ */
+ public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, MessageCollectionItemType valType,
+ MessageWriter writer);
+
+ /**
+ * @return Value.
+ */
+ public byte readByte();
+
+ /**
+ * @return Value.
+ */
+ public short readShort();
+
+ /**
+ * @return Value.
+ */
+ public int readInt();
+
+ /**
+ * @return Value.
+ */
+ public long readLong();
+
+ /**
+ * @return Value.
+ */
+ public float readFloat();
+
+ /**
+ * @return Value.
+ */
+ public double readDouble();
+
+ /**
+ * @return Value.
+ */
+ public char readChar();
+
+ /**
+ * @return Value.
+ */
+ public boolean readBoolean();
+
+ /**
+ * @return Value.
+ */
+ public byte[] readByteArray();
+
+ /**
+ * @return Value.
+ */
+ public short[] readShortArray();
+
+ /**
+ * @return Value.
+ */
+ public int[] readIntArray();
+
+ /**
+ * @return Value.
+ */
+ public long[] readLongArray();
+
+ /**
+ * @return Value.
+ */
+ public float[] readFloatArray();
+
+ /**
+ * @return Value.
+ */
+ public double[] readDoubleArray();
+
+ /**
+ * @return Value.
+ */
+ public char[] readCharArray();
+
+ /**
+ * @return Value.
+ */
+ public boolean[] readBooleanArray();
+
+ /**
+ * @return Value.
+ */
+ public String readString();
+
+ /**
+ * @return Value.
+ */
+ public BitSet readBitSet();
+
+ /**
+ * @return Value.
+ */
+ public UUID readUuid();
+
+ /**
+ * @return Value.
+ */
+ public IgniteUuid readIgniteUuid();
+
+ /**
+ * @param reader Reader.
+ * @return Message.
+ */
+ public <T extends NetworkMessage> T readMessage(MessageReader reader);
+
+ /**
+ * @param itemType Item type.
+ * @param itemCls Item class.
+ * @param reader Reader.
+ * @return Array.
+ */
+ public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, MessageReader reader);
+
+ /**
+ * @param itemType Item type.
+ * @param reader Reader.
+ * @return Collection.
+ */
+ public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader);
+
+ /**
+ * @param keyType Key type.
+ * @param valType Value type.
+ * @param linked Whether linked map should be created.
+ * @param reader Reader.
+ * @return Map.
+ */
+ public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType,
+ boolean linked, MessageReader reader);
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
new file mode 100644
index 0000000..06f1a8d
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
@@ -0,0 +1,1805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.direct.stream;
+
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.UUID;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.internal.MessageSerializerFactory;
+import org.apache.ignite.network.internal.MessageWriter;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+
+import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
+import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
+
+public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
+ /** */
+ private static final byte[] BYTE_ARR_EMPTY = new byte[0];
+
+ /** */
+ private static final short[] SHORT_ARR_EMPTY = new short[0];
+
+ /** */
+ private static final int[] INT_ARR_EMPTY = new int[0];
+
+ /** */
+ private static final long[] LONG_ARR_EMPTY = new long[0];
+
+ /** */
+ private static final float[] FLOAT_ARR_EMPTY = new float[0];
+
+ /** */
+ private static final double[] DOUBLE_ARR_EMPTY = new double[0];
+
+ /** */
+ private static final char[] CHAR_ARR_EMPTY = new char[0];
+
+ /** */
+ private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0];
+
+ /** */
+ private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() {
+ @Override public byte[] create(int len) {
+ if (len < 0)
+ throw new IgniteInternalException("Read invalid byte array length: " + len);
+
+ switch (len) {
+ case 0:
+ return BYTE_ARR_EMPTY;
+
+ default:
+ return new byte[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() {
+ @Override public short[] create(int len) {
+ if (len < 0)
+ throw new IgniteInternalException("Read invalid short array length: " + len);
+
+ switch (len) {
+ case 0:
+ return SHORT_ARR_EMPTY;
+
+ default:
+ return new short[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() {
+ @Override public int[] create(int len) {
+ if (len < 0)
+ throw new IgniteInternalException("Read invalid int array length: " + len);
+
+ switch (len) {
+ case 0:
+ return INT_ARR_EMPTY;
+
+ default:
+ return new int[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() {
+ @Override public long[] create(int len) {
+ if (len < 0)
+ throw new IgniteInternalException("Read invalid long array length: " + len);
+
+ switch (len) {
+ case 0:
+ return LONG_ARR_EMPTY;
+
+ default:
+ return new long[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() {
+ @Override public float[] create(int len) {
+ if (len < 0)
+ throw new IgniteInternalException("Read invalid float array length: " + len);
+
+ switch (len) {
+ case 0:
+ return FLOAT_ARR_EMPTY;
+
+ default:
+ return new float[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() {
+ @Override public double[] create(int len) {
+ if (len < 0)
+ throw new IgniteInternalException("Read invalid double array length: " + len);
+
+ switch (len) {
+ case 0:
+ return DOUBLE_ARR_EMPTY;
+
+ default:
+ return new double[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() {
+ @Override public char[] create(int len) {
+ if (len < 0)
+ throw new IgniteInternalException("Read invalid char array length: " + len);
+
+ switch (len) {
+ case 0:
+ return CHAR_ARR_EMPTY;
+
+ default:
+ return new char[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() {
+ @Override public boolean[] create(int len) {
+ if (len < 0)
+ throw new IgniteInternalException("Read invalid boolean array length: " + len);
+
+ switch (len) {
+ case 0:
+ return BOOLEAN_ARR_EMPTY;
+
+ default:
+ return new boolean[len];
+ }
+ }
+ };
+
+ /** */
+ private static final Object NULL = new Object();
+
+ /** */
+ private final MessageSerializerFactory msgSerFactory;
+
+ /** */
+ private ByteBuffer buf;
+
+ /** */
+ private byte[] heapArr;
+
+ /** */
+ private long baseOff;
+
+ /** */
+ private int arrOff = -1;
+
+ /** */
+ private Object tmpArr;
+
+ /** */
+ private int tmpArrOff;
+
+ /** Number of bytes of the boundary value, read from previous message. */
+ private int valReadBytes;
+
+ /** */
+ private int tmpArrBytes;
+
+ /** */
+ private boolean msgTypeDone;
+
+ /** */
+ private MessageDeserializer<NetworkMessage> msgDeserializer;
+
+ /** */
+ private Iterator<?> mapIt;
+
+ /** */
+ private Iterator<?> it;
+
+ /** */
+ private int arrPos = -1;
+
+ /** */
+ private Object arrCur = NULL;
+
+ /** */
+ private Object mapCur = NULL;
+
+ /** */
+ private Object cur = NULL;
+
+ /** */
+ private boolean keyDone;
+
+ /** */
+ private int readSize = -1;
+
+ /** */
+ private int readItems;
+
+ /** */
+ private Object[] objArr;
+
+ /** */
+ private Collection<Object> col;
+
+ /** */
+ private Map<Object, Object> map;
+
+ /** */
+ private long prim;
+
+ /** */
+ private int primShift;
+
+ /** */
+ private int uuidState;
+
+ /** */
+ private long uuidMost;
+
+ /** */
+ private long uuidLeast;
+
+ /** */
+ private long uuidLocId;
+
+ /** */
+ protected boolean lastFinished;
+
+ /** byte-array representation of string */
+ private byte[] curStrBackingArr;
+
+ /**
+ * @param msgSerFactory Message factory.
+ */
+ public DirectByteBufferStreamImplV1(MessageSerializerFactory msgSerFactory) {
+ this.msgSerFactory = msgSerFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setBuffer(ByteBuffer buf) {
+ assert buf != null;
+
+ if (this.buf != buf) {
+ this.buf = buf;
+
+ heapArr = buf.isDirect() ? null : buf.array();
+ baseOff = buf.isDirect() ? GridUnsafe.bufferAddress(buf) : BYTE_ARR_OFF;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int remaining() {
+ return buf.remaining();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean lastFinished() {
+ return lastFinished;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByte(byte val) {
+ lastFinished = buf.remaining() >= 1;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ GridUnsafe.putByte(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 1);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShort(short val) {
+ lastFinished = buf.remaining() >= 2;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ long off = baseOff + pos;
+
+ if (BIG_ENDIAN)
+ GridUnsafe.putShortLE(heapArr, off, val);
+ else
+ GridUnsafe.putShort(heapArr, off, val);
+
+ buf.position(pos + 2);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int val) {
+ lastFinished = buf.remaining() >= 5;
+
+ if (lastFinished) {
+ if (val == Integer.MAX_VALUE)
+ val = Integer.MIN_VALUE;
+ else
+ val++;
+
+ int pos = buf.position();
+
+ while ((val & 0xFFFF_FF80) != 0) {
+ byte b = (byte)(val | 0x80);
+
+ GridUnsafe.putByte(heapArr, baseOff + pos++, b);
+
+ val >>>= 7;
+ }
+
+ GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
+
+ buf.position(pos);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLong(long val) {
+ lastFinished = buf.remaining() >= 10;
+
+ if (lastFinished) {
+ if (val == Long.MAX_VALUE)
+ val = Long.MIN_VALUE;
+ else
+ val++;
+
+ int pos = buf.position();
+
+ while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
+ byte b = (byte)(val | 0x80);
+
+ GridUnsafe.putByte(heapArr, baseOff + pos++, b);
+
+ val >>>= 7;
+ }
+
+ GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
+
+ buf.position(pos);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloat(float val) {
+ lastFinished = buf.remaining() >= 4;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ long off = baseOff + pos;
+
+ if (BIG_ENDIAN)
+ GridUnsafe.putFloatLE(heapArr, off, val);
+ else
+ GridUnsafe.putFloat(heapArr, off, val);
+
+ buf.position(pos + 4);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDouble(double val) {
+ lastFinished = buf.remaining() >= 8;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ long off = baseOff + pos;
+
+ if (BIG_ENDIAN)
+ GridUnsafe.putDoubleLE(heapArr, off, val);
+ else
+ GridUnsafe.putDouble(heapArr, off, val);
+
+ buf.position(pos + 8);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeChar(char val) {
+ lastFinished = buf.remaining() >= 2;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ long off = baseOff + pos;
+
+ if (BIG_ENDIAN)
+ GridUnsafe.putCharLE(heapArr, off, val);
+ else
+ GridUnsafe.putChar(heapArr, off, val);
+
+ buf.position(pos + 2);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBoolean(boolean val) {
+ lastFinished = buf.remaining() >= 1;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ GridUnsafe.putBoolean(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 1);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByteArray(byte[] val) {
+ if (val != null)
+ lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByteArray(byte[] val, long off, int len) {
+ if (val != null)
+ lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShortArray(short[] val) {
+ if (val != null)
+ if (BIG_ENDIAN)
+ lastFinished = writeArrayLE(val, SHORT_ARR_OFF, val.length, 2, 1);
+ else
+ lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeIntArray(int[] val) {
+ if (val != null)
+ if (BIG_ENDIAN)
+ lastFinished = writeArrayLE(val, INT_ARR_OFF, val.length, 4, 2);
+ else
+ lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLongArray(long[] val) {
+ if (val != null)
+ if (BIG_ENDIAN)
+ lastFinished = writeArrayLE(val, LONG_ARR_OFF, val.length, 8, 3);
+ else
+ lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLongArray(long[] val, int len) {
+ if (val != null)
+ if (BIG_ENDIAN)
+ lastFinished = writeArrayLE(val, LONG_ARR_OFF, len, 8, 3);
+ else
+ lastFinished = writeArray(val, LONG_ARR_OFF, len, len << 3);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloatArray(float[] val) {
+ if (val != null)
+ if (BIG_ENDIAN)
+ lastFinished = writeArrayLE(val, FLOAT_ARR_OFF, val.length, 4, 2);
+ else
+ lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDoubleArray(double[] val) {
+ if (val != null)
+ if (BIG_ENDIAN)
+ lastFinished = writeArrayLE(val, DOUBLE_ARR_OFF, val.length, 8, 3);
+ else
+ lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeCharArray(char[] val) {
+ if (val != null) {
+ if (BIG_ENDIAN)
+ lastFinished = writeArrayLE(val, CHAR_ARR_OFF, val.length, 2, 1);
+ else
+ lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1);
+ }
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBooleanArray(boolean[] val) {
+ if (val != null)
+ lastFinished = writeArray(val, GridUnsafe.BOOLEAN_ARR_OFF, val.length, val.length);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeString(String val) {
+ if (val != null) {
+ if (curStrBackingArr == null)
+ curStrBackingArr = val.getBytes();
+
+ writeByteArray(curStrBackingArr);
+
+ if (lastFinished)
+ curStrBackingArr = null;
+ }
+ else
+ writeByteArray(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBitSet(BitSet val) {
+ writeLongArray(val != null ? val.toLongArray() : null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeUuid(UUID val) {
+ switch (uuidState) {
+ case 0:
+ writeBoolean(val == null);
+
+ if (!lastFinished || val == null)
+ return;
+
+ uuidState++;
+
+ //noinspection fallthrough
+ case 1:
+ writeLong(val.getMostSignificantBits());
+
+ if (!lastFinished)
+ return;
+
+ uuidState++;
+
+ //noinspection fallthrough
+ case 2:
+ writeLong(val.getLeastSignificantBits());
+
+ if (!lastFinished)
+ return;
+
+ uuidState = 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeIgniteUuid(IgniteUuid val) {
+ switch (uuidState) {
+ case 0:
+ writeBoolean(val == null);
+
+ if (!lastFinished || val == null)
+ return;
+
+ uuidState++;
+
+ //noinspection fallthrough
+ case 1:
+ writeLong(val.globalId().getMostSignificantBits());
+
+ if (!lastFinished)
+ return;
+
+ uuidState++;
+
+ //noinspection fallthrough
+ case 2:
+ writeLong(val.globalId().getLeastSignificantBits());
+
+ if (!lastFinished)
+ return;
+
+ uuidState++;
+
+ //noinspection fallthrough
+ case 3:
+ writeLong(val.localId());
+
+ if (!lastFinished)
+ return;
+
+ uuidState = 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeMessage(NetworkMessage msg, MessageWriter writer) {
+ if (msg != null) {
+ if (buf.hasRemaining()) {
+ try {
+ writer.beforeInnerMessageWrite();
+
+ writer.setCurrentWriteClass(msg.getClass());
+
+ MessageSerializer<NetworkMessage> serializer = msgSerFactory.createSerializer(msg.directType());
+
+ writer.setBuffer(buf);
+
+ lastFinished = serializer.writeMessage(msg, writer);
+ }
+ finally {
+ writer.afterInnerMessageWrite(lastFinished);
+ }
+ }
+ else
+ lastFinished = false;
+ }
+ else
+ writeShort(Short.MIN_VALUE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType,
+ MessageWriter writer) {
+ if (arr != null) {
+ int len = arr.length;
+
+ if (arrPos == -1) {
+ writeInt(len);
+
+ if (!lastFinished)
+ return;
+
+ arrPos = 0;
+ }
+
+ while (arrPos < len || arrCur != NULL) {
+ if (arrCur == NULL)
+ arrCur = arr[arrPos++];
+
+ write(itemType, arrCur, writer);
+
+ if (!lastFinished)
+ return;
+
+ arrCur = NULL;
+ }
+
+ arrPos = -1;
+ }
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType,
+ MessageWriter writer) {
+ if (col != null) {
+ if (col instanceof List && col instanceof RandomAccess)
+ writeRandomAccessList((List<T>)col, itemType, writer);
+ else {
+ if (it == null) {
+ writeInt(col.size());
+
+ if (!lastFinished)
+ return;
+
+ it = col.iterator();
+ }
+
+ while (it.hasNext() || cur != NULL) {
+ if (cur == NULL)
+ cur = it.next();
+
+ write(itemType, cur, writer);
+
+ if (!lastFinished)
+ return;
+
+ cur = NULL;
+ }
+
+ it = null;
+ }
+ }
+ else
+ writeInt(-1);
+ }
+
+ /**
+ * @param list List.
+ * @param itemType Component type.
+ * @param writer Writer.
+ */
+ private <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType itemType, MessageWriter writer) {
+ assert list instanceof RandomAccess;
+
+ int size = list.size();
+
+ if (arrPos == -1) {
+ writeInt(size);
+
+ if (!lastFinished)
+ return;
+
+ arrPos = 0;
+ }
+
+ while (arrPos < size || arrCur != NULL) {
+ if (arrCur == NULL)
+ arrCur = list.get(arrPos++);
+
+ write(itemType, arrCur, writer);
+
+ if (!lastFinished)
+ return;
+
+ arrCur = NULL;
+ }
+
+ arrPos = -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType,
+ MessageCollectionItemType valType, MessageWriter writer) {
+ if (map != null) {
+ if (mapIt == null) {
+ writeInt(map.size());
+
+ if (!lastFinished)
+ return;
+
+ mapIt = map.entrySet().iterator();
+ }
+
+ while (mapIt.hasNext() || mapCur != NULL) {
+ if (mapCur == NULL)
+ mapCur = mapIt.next();
+
+ Map.Entry<K, V> e = (Map.Entry<K, V>) mapCur;
+
+ if (!keyDone) {
+ write(keyType, e.getKey(), writer);
+
+ if (!lastFinished)
+ return;
+
+ keyDone = true;
+ }
+
+ write(valType, e.getValue(), writer);
+
+ if (!lastFinished)
+ return;
+
+ mapCur = NULL;
+ keyDone = false;
+ }
+
+ mapIt = null;
+ }
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte readByte() {
+ lastFinished = buf.remaining() >= 1;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 1);
+
+ return GridUnsafe.getByte(heapArr, baseOff + pos);
+ }
+ else
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short readShort() {
+ lastFinished = buf.remaining() >= 2;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 2);
+
+ long off = baseOff + pos;
+
+ return BIG_ENDIAN ? GridUnsafe.getShortLE(heapArr, off) : GridUnsafe.getShort(heapArr, off);
+ }
+ else
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt() {
+ lastFinished = false;
+
+ int val = 0;
+
+ while (buf.hasRemaining()) {
+ int pos = buf.position();
+
+ byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
+
+ buf.position(pos + 1);
+
+ prim |= ((long)b & 0x7F) << (7 * primShift);
+
+ if ((b & 0x80) == 0) {
+ lastFinished = true;
+
+ val = (int)prim;
+
+ if (val == Integer.MIN_VALUE)
+ val = Integer.MAX_VALUE;
+ else
+ val--;
+
+ prim = 0;
+ primShift = 0;
+
+ break;
+ }
+ else
+ primShift++;
+ }
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLong() {
+ lastFinished = false;
+
+ long val = 0;
+
+ while (buf.hasRemaining()) {
+ int pos = buf.position();
+
+ byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
+
+ buf.position(pos + 1);
+
+ prim |= ((long)b & 0x7F) << (7 * primShift);
+
+ if ((b & 0x80) == 0) {
+ lastFinished = true;
+
+ val = prim;
+
+ if (val == Long.MIN_VALUE)
+ val = Long.MAX_VALUE;
+ else
+ val--;
+
+ prim = 0;
+ primShift = 0;
+
+ break;
+ }
+ else
+ primShift++;
+ }
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float readFloat() {
+ lastFinished = buf.remaining() >= 4;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 4);
+
+ long off = baseOff + pos;
+
+ return BIG_ENDIAN ? GridUnsafe.getFloatLE(heapArr, off) : GridUnsafe.getFloat(heapArr, off);
+ }
+ else
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public double readDouble() {
+ lastFinished = buf.remaining() >= 8;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 8);
+
+ long off = baseOff + pos;
+
+ return BIG_ENDIAN ? GridUnsafe.getDoubleLE(heapArr, off) : GridUnsafe.getDouble(heapArr, off);
+ }
+ else
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char readChar() {
+ lastFinished = buf.remaining() >= 2;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 2);
+
+ long off = baseOff + pos;
+
+ return BIG_ENDIAN ? GridUnsafe.getCharLE(heapArr, off) : GridUnsafe.getChar(heapArr, off);
+ }
+ else
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readBoolean() {
+ lastFinished = buf.hasRemaining();
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 1);
+
+ return GridUnsafe.getBoolean(heapArr, baseOff + pos);
+ }
+ else
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] readByteArray() {
+ return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short[] readShortArray() {
+ if (BIG_ENDIAN)
+ return readArrayLE(SHORT_ARR_CREATOR, 2, 1, SHORT_ARR_OFF);
+ else
+ return readArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] readIntArray() {
+ if (BIG_ENDIAN)
+ return readArrayLE(INT_ARR_CREATOR, 4, 2, INT_ARR_OFF);
+ else
+ return readArray(INT_ARR_CREATOR, 2, INT_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long[] readLongArray() {
+ if (BIG_ENDIAN)
+ return readArrayLE(LONG_ARR_CREATOR, 8, 3, LONG_ARR_OFF);
+ else
+ return readArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public float[] readFloatArray() {
+ if (BIG_ENDIAN)
+ return readArrayLE(FLOAT_ARR_CREATOR, 4, 2, FLOAT_ARR_OFF);
+ else
+ return readArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double[] readDoubleArray() {
+ if (BIG_ENDIAN)
+ return readArrayLE(DOUBLE_ARR_CREATOR, 8, 3, DOUBLE_ARR_OFF);
+ else
+ return readArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public char[] readCharArray() {
+ if (BIG_ENDIAN)
+ return readArrayLE(CHAR_ARR_CREATOR, 2, 1, CHAR_ARR_OFF);
+ else
+ return readArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean[] readBooleanArray() {
+ return readArray(BOOLEAN_ARR_CREATOR, 0, GridUnsafe.BOOLEAN_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String readString() {
+ byte[] arr = readByteArray();
+
+ return arr != null ? new String(arr) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public BitSet readBitSet() {
+ long[] arr = readLongArray();
+
+ return arr != null ? BitSet.valueOf(arr) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID readUuid() {
+ switch (uuidState) {
+ case 0:
+ boolean isNull = readBoolean();
+
+ if (!lastFinished || isNull)
+ return null;
+
+ uuidState++;
+
+ //noinspection fallthrough
+ case 1:
+ uuidMost = readLong();
+
+ if (!lastFinished)
+ return null;
+
+ uuidState++;
+
+ //noinspection fallthrough
+ case 2:
+ uuidLeast = readLong();
+
+ if (!lastFinished)
+ return null;
+
+ uuidState = 0;
+ }
+
+ UUID val = new UUID(uuidMost, uuidLeast);
+
+ uuidMost = 0;
+ uuidLeast = 0;
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid readIgniteUuid() {
+ switch (uuidState) {
+ case 0:
+ boolean isNull = readBoolean();
+
+ if (!lastFinished || isNull)
+ return null;
+
+ uuidState++;
+
+ //noinspection fallthrough
+ case 1:
+ uuidMost = readLong();
+
+ if (!lastFinished)
+ return null;
+
+ uuidState++;
+
+ //noinspection fallthrough
+ case 2:
+ uuidLeast = readLong();
+
+ if (!lastFinished)
+ return null;
+
+ uuidState++;
+
+ //noinspection fallthrough
+ case 3:
+ uuidLocId = readLong();
+
+ if (!lastFinished)
+ return null;
+
+ uuidState = 0;
+ }
+
+ IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), uuidLocId);
+
+ uuidMost = 0;
+ uuidLeast = 0;
+ uuidLocId = 0;
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends NetworkMessage> T readMessage(MessageReader reader) {
+ if (!msgTypeDone) {
+ if (buf.remaining() < NetworkMessage.DIRECT_TYPE_SIZE) {
+ lastFinished = false;
+
+ return null;
+ }
+
+ short type = readShort();
+
+ msgDeserializer = type == Short.MIN_VALUE ? null : msgSerFactory.createDeserializer(type);
+
+ msgTypeDone = true;
+ }
+
+ if (msgDeserializer != null) {
+ try {
+ reader.beforeInnerMessageRead();
+
+ reader.setCurrentReadClass(msgDeserializer.klass());
+
+ reader.setBuffer(buf);
+ lastFinished = msgDeserializer.readMessage(reader);
+ }
+ finally {
+ reader.afterInnerMessageRead(lastFinished);
+ }
+ }
+ else
+ lastFinished = true;
+
+ if (lastFinished) {
+ NetworkMessage msg0 = msgDeserializer.getMessage();
+
+ msgTypeDone = false;
+ msgDeserializer = null;
+
+ return (T)msg0;
+ }
+ else
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls,
+ MessageReader reader) {
+ if (readSize == -1) {
+ int size = readInt();
+
+ if (!lastFinished)
+ return null;
+
+ readSize = size;
+ }
+
+ if (readSize >= 0) {
+ if (objArr == null)
+ objArr = itemCls != null ? (Object[]) Array.newInstance(itemCls, readSize) : new Object[readSize];
+
+ for (int i = readItems; i < readSize; i++) {
+ Object item = read(itemType, reader);
+
+ if (!lastFinished)
+ return null;
+
+ objArr[i] = item;
+
+ readItems++;
+ }
+ }
+
+ readSize = -1;
+ readItems = 0;
+ cur = null;
+
+ T[] objArr0 = (T[])objArr;
+
+ objArr = null;
+
+ return objArr0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType,
+ MessageReader reader) {
+ if (readSize == -1) {
+ int size = readInt();
+
+ if (!lastFinished)
+ return null;
+
+ readSize = size;
+ }
+
+ if (readSize >= 0) {
+ if (col == null)
+ col = new ArrayList<>(readSize);
+
+ for (int i = readItems; i < readSize; i++) {
+ Object item = read(itemType, reader);
+
+ if (!lastFinished)
+ return null;
+
+ col.add(item);
+
+ readItems++;
+ }
+ }
+
+ readSize = -1;
+ readItems = 0;
+ cur = null;
+
+ C col0 = (C)col;
+
+ col = null;
+
+ return col0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
+ MessageCollectionItemType valType, boolean linked, MessageReader reader) {
+ if (readSize == -1) {
+ int size = readInt();
+
+ if (!lastFinished)
+ return null;
+
+ readSize = size;
+ }
+
+ if (readSize >= 0) {
+ if (map == null)
+ map = linked ? IgniteUtils.newLinkedHashMap(readSize) : IgniteUtils.newHashMap(readSize);
+
+ for (int i = readItems; i < readSize; i++) {
+ if (!keyDone) {
+ Object key = read(keyType, reader);
+
+ if (!lastFinished)
+ return null;
+
+ mapCur = key;
+ keyDone = true;
+ }
+
+ Object val = read(valType, reader);
+
+ if (!lastFinished)
+ return null;
+
+ map.put(mapCur, val);
+
+ keyDone = false;
+
+ readItems++;
+ }
+ }
+
+ readSize = -1;
+ readItems = 0;
+ mapCur = null;
+
+ M map0 = (M)map;
+
+ map = null;
+
+ return map0;
+ }
+
+ /**
+ * @param arr Array.
+ * @param off Offset.
+ * @param len Length.
+ * @param bytes Length in bytes.
+ * @return Whether array was fully written.
+ */
+ boolean writeArray(Object arr, long off, int len, int bytes) {
+ assert arr != null;
+ assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
+ assert off > 0;
+ assert len >= 0;
+ assert bytes >= 0;
+ assert bytes >= arrOff;
+
+ if (writeArrayLength(len))
+ return false;
+
+ int toWrite = bytes - arrOff;
+ int pos = buf.position();
+ int remaining = buf.remaining();
+
+ if (toWrite <= remaining) {
+ if (toWrite > 0) {
+ GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
+
+ buf.position(pos + toWrite);
+ }
+
+ arrOff = -1;
+
+ return true;
+ }
+ else {
+ if (remaining > 0) {
+ GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
+
+ buf.position(pos + remaining);
+
+ arrOff += remaining;
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * @param arr Array.
+ * @param off Offset.
+ * @param len Length.
+ * @param typeSize Primitive type size in bytes. Needs for byte reverse.
+ * @param shiftCnt Shift for length.
+ * @return Whether array was fully written.
+ */
+ boolean writeArrayLE(Object arr, long off, int len, int typeSize, int shiftCnt) {
+ assert arr != null;
+ assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
+ assert off > 0;
+ assert len >= 0;
+
+ int bytes = len << shiftCnt;
+
+ assert bytes >= arrOff;
+
+ if (writeArrayLength(len))
+ return false;
+
+ int toWrite = (bytes - arrOff) >> shiftCnt;
+ int remaining = buf.remaining() >> shiftCnt;
+
+ if (toWrite <= remaining) {
+ writeArrayLE(arr, off, toWrite, typeSize);
+
+ arrOff = -1;
+
+ return true;
+ }
+ else {
+ if (remaining > 0)
+ writeArrayLE(arr, off, remaining, typeSize);
+
+ return false;
+ }
+ }
+
+ /**
+ * @param arr Array.
+ * @param off Offset.
+ * @param len Length.
+ * @param typeSize Primitive type size in bytes.
+ */
+ private void writeArrayLE(Object arr, long off, int len, int typeSize) {
+ int pos = buf.position();
+
+ for (int i = 0; i < len; i++) {
+ for (int j = 0; j < typeSize; j++) {
+ byte b = GridUnsafe.getByteField(arr, off + arrOff + (typeSize - j - 1));
+
+ GridUnsafe.putByte(heapArr, baseOff + pos++, b);
+ }
+
+ buf.position(pos);
+ arrOff += typeSize;
+ }
+ }
+
+ /**
+ * @param len Length.
+ */
+ private boolean writeArrayLength(int len) {
+ if (arrOff == -1) {
+ writeInt(len);
+
+ if (!lastFinished)
+ return true;
+
+ arrOff = 0;
+ }
+ return false;
+ }
+
+ /**
+ * @param creator Array creator.
+ * @param lenShift Array length shift size.
+ * @param off Base offset.
+ * @return Array or special value if it was not fully read.
+ */
+ <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) {
+ assert creator != null;
+
+ if (tmpArr == null) {
+ int len = readInt();
+
+ if (!lastFinished)
+ return null;
+
+ switch (len) {
+ case -1:
+ lastFinished = true;
+
+ return null;
+
+ case 0:
+ lastFinished = true;
+
+ return creator.create(0);
+
+ default:
+ tmpArr = creator.create(len);
+ tmpArrBytes = len << lenShift;
+ }
+ }
+
+ int toRead = tmpArrBytes - tmpArrOff;
+ int remaining = buf.remaining();
+ int pos = buf.position();
+
+ lastFinished = toRead <= remaining;
+
+ if (lastFinished) {
+ GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
+
+ buf.position(pos + toRead);
+
+ T arr = (T)tmpArr;
+
+ tmpArr = null;
+ tmpArrBytes = 0;
+ tmpArrOff = 0;
+
+ return arr;
+ }
+ else {
+ GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
+
+ buf.position(pos + remaining);
+
+ tmpArrOff += remaining;
+
+ return null;
+ }
+ }
+
+ /**
+ * @param creator Array creator.
+ * @param typeSize Primitive type size in bytes.
+ * @param lenShift Array length shift size.
+ * @param off Base offset.
+ * @return Array or special value if it was not fully read.
+ */
+ <T> T readArrayLE(ArrayCreator<T> creator, int typeSize, int lenShift, long off) {
+ assert creator != null;
+
+ if (tmpArr == null) {
+ int len = readInt();
+
+ if (!lastFinished)
+ return null;
+
+ switch (len) {
+ case -1:
+ lastFinished = true;
+
+ return null;
+
+ case 0:
+ lastFinished = true;
+
+ return creator.create(0);
+
+ default:
+ tmpArr = creator.create(len);
+ tmpArrBytes = len << lenShift;
+ }
+ }
+
+ int toRead = tmpArrBytes - tmpArrOff - valReadBytes;
+ int remaining = buf.remaining();
+
+ lastFinished = toRead <= remaining;
+
+ if (!lastFinished)
+ toRead = remaining;
+
+ int pos = buf.position();
+
+ for (int i = 0; i < toRead; i++) {
+ byte b = GridUnsafe.getByte(heapArr, baseOff + pos + i);
+
+ GridUnsafe.putByteField(tmpArr, off + tmpArrOff + (typeSize - valReadBytes - 1), b);
+
+ if (++valReadBytes == typeSize) {
+ valReadBytes = 0;
+ tmpArrOff += typeSize;
+ }
+ }
+
+ buf.position(pos + toRead);
+
+ if (lastFinished) {
+ T arr = (T)tmpArr;
+
+ tmpArr = null;
+ tmpArrBytes = 0;
+ tmpArrOff = 0;
+
+ return arr;
+ }
+ else
+ return null;
+ }
+
+ protected void write(MessageCollectionItemType type, Object val, MessageWriter writer) {
+ switch (type) {
+ case BYTE:
+ writeByte((Byte)val);
+
+ break;
+
+ case SHORT:
+ writeShort((Short)val);
+
+ break;
+
+ case INT:
+ writeInt((Integer)val);
+
+ break;
+
+ case LONG:
+ writeLong((Long)val);
+
+ break;
+
+ case FLOAT:
+ writeFloat((Float)val);
+
+ break;
+
+ case DOUBLE:
+ writeDouble((Double)val);
+
+ break;
+
+ case CHAR:
+ writeChar((Character)val);
+
+ break;
+
+ case BOOLEAN:
+ writeBoolean((Boolean)val);
+
+ break;
+
+ case BYTE_ARR:
+ writeByteArray((byte[])val);
+
+ break;
+
+ case SHORT_ARR:
+ writeShortArray((short[])val);
+
+ break;
+
+ case INT_ARR:
+ writeIntArray((int[])val);
+
+ break;
+
+ case LONG_ARR:
+ writeLongArray((long[])val);
+
+ break;
+
+ case FLOAT_ARR:
+ writeFloatArray((float[])val);
+
+ break;
+
+ case DOUBLE_ARR:
+ writeDoubleArray((double[])val);
+
+ break;
+
+ case CHAR_ARR:
+ writeCharArray((char[])val);
+
+ break;
+
+ case BOOLEAN_ARR:
+ writeBooleanArray((boolean[])val);
+
+ break;
+
+ case STRING:
+ writeString((String)val);
+
+ break;
+
+ case BIT_SET:
+ writeBitSet((BitSet)val);
+
+ break;
+
+ case UUID:
+ writeUuid((UUID)val);
+
+ break;
+
+ case IGNITE_UUID:
+ writeIgniteUuid((IgniteUuid)val);
+
+ break;
+
+ case MSG:
+ try {
+ if (val != null)
+ writer.beforeInnerMessageWrite();
+
+ writeMessage((NetworkMessage) val, writer);
+ }
+ finally {
+ if (val != null)
+ writer.afterInnerMessageWrite(lastFinished);
+ }
+
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + type);
+ }
+ }
+
+ /**
+ * @param type Type.
+ * @param reader Reader.
+ * @return Value.
+ */
+ protected Object read(MessageCollectionItemType type, MessageReader reader) {
+ switch (type) {
+ case BYTE:
+ return readByte();
+
+ case SHORT:
+ return readShort();
+
+ case INT:
+ return readInt();
+
+ case LONG:
+ return readLong();
+
+ case FLOAT:
+ return readFloat();
+
+ case DOUBLE:
+ return readDouble();
+
+ case CHAR:
+ return readChar();
+
+ case BOOLEAN:
+ return readBoolean();
+
+ case BYTE_ARR:
+ return readByteArray();
+
+ case SHORT_ARR:
+ return readShortArray();
+
+ case INT_ARR:
+ return readIntArray();
+
+ case LONG_ARR:
+ return readLongArray();
+
+ case FLOAT_ARR:
+ return readFloatArray();
+
+ case DOUBLE_ARR:
+ return readDoubleArray();
+
+ case CHAR_ARR:
+ return readCharArray();
+
+ case BOOLEAN_ARR:
+ return readBooleanArray();
+
+ case STRING:
+ return readString();
+
+ case BIT_SET:
+ return readBitSet();
+
+ case UUID:
+ return readUuid();
+
+ case IGNITE_UUID:
+ return readIgniteUuid();
+
+ case MSG:
+ return readMessage(reader);
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + type);
+ }
+ }
+
+ /**
+ * Array creator.
+ */
+ interface ArrayCreator<T> {
+ /**
+ * @param len Array length or {@code -1} if array was not fully read.
+ * @return New array.
+ */
+ public T create(int len);
+ }
+}
+
diff --git a/modules/network/src/main/java/org/apache/ignite/network/message/DefaultMessageMapperProvider.java b/modules/network/src/main/java/org/apache/ignite/network/message/DefaultMessageMapperProvider.java
deleted file mode 100644
index 32e8110..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/message/DefaultMessageMapperProvider.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network.message;
-
-import java.io.IOException;
-import org.apache.ignite.network.message.MessageDeserializer;
-import org.apache.ignite.network.message.MessageMapperProvider;
-import org.apache.ignite.network.message.MessageMappingException;
-import org.apache.ignite.network.message.MessageSerializer;
-import org.apache.ignite.network.message.NetworkMessage;
-
-/**
- * Uses JDK serialization.
- */
-public class DefaultMessageMapperProvider implements MessageMapperProvider<NetworkMessage> {
- /** {@inheritDoc} */
- @Override public MessageDeserializer<NetworkMessage> createDeserializer() {
- return reader -> {
- try {
- return (NetworkMessage)reader.stream().readObject();
- }
- catch (Exception e) {
- throw new MessageMappingException("Failed to deserialize", e);
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public MessageSerializer<NetworkMessage> createSerializer() {
- return (message, writer) -> {
- try {
- writer.stream().writeObject(message);
- }
- catch (IOException e) {
- throw new MessageMappingException("Failed to serialize", e);
- }
- };
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/message/MessageDeserializer.java b/modules/network/src/main/java/org/apache/ignite/network/message/MessageDeserializer.java
index 44a4164..d798ecd 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/message/MessageDeserializer.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/message/MessageDeserializer.java
@@ -25,10 +25,14 @@ import org.apache.ignite.network.internal.MessageReader;
*/
public interface MessageDeserializer<M extends NetworkMessage> {
/**
- * Read message from reader.
+ * Read a message from the reader.
* @param reader Message reader.
- * @return Read message.
+ * @return {@code true } if the message has been read completely.
* @throws MessageMappingException If failed.
*/
- M readMessage(MessageReader reader) throws MessageMappingException;
+ boolean readMessage(MessageReader reader) throws MessageMappingException;
+
+ Class<M> klass();
+
+ M getMessage();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializer.java b/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializer.java
index bc48e2d..3ac5583 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializer.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializer.java
@@ -25,10 +25,11 @@ import org.apache.ignite.network.internal.MessageWriter;
*/
public interface MessageSerializer<M extends NetworkMessage> {
/**
- * Write message to writer.
+ * Writes a message to the writer.
* @param message Message.
* @param writer Message writer.
+ * @return {@code true } if the message was completely written.
* @throws MessageMappingException If failed.
*/
- void writeMessage(M message, MessageWriter writer) throws MessageMappingException;
+ boolean writeMessage(M message, MessageWriter writer) throws MessageMappingException;
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/message/MessageMapperProvider.java b/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializerProvider.java
similarity index 89%
rename from modules/network/src/main/java/org/apache/ignite/network/message/MessageMapperProvider.java
rename to modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializerProvider.java
index b5edb41..b6dab48 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/message/MessageMapperProvider.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/message/MessageSerializerProvider.java
@@ -22,7 +22,7 @@ package org.apache.ignite.network.message;
* for working with {@link NetworkMessage} objects.
* @param <M> Message type.
*/
-public interface MessageMapperProvider<M extends NetworkMessage> {
+public interface MessageSerializerProvider<M extends NetworkMessage> {
/**
* Create deserializer.
* @return Message deserializer.
@@ -34,4 +34,9 @@ public interface MessageMapperProvider<M extends NetworkMessage> {
* @return Message serializer.
*/
MessageSerializer<M> createSerializer();
+
+ /**
+ * @return Message's field count.
+ */
+ byte fieldsCount();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/message/NetworkMessage.java b/modules/network/src/main/java/org/apache/ignite/network/message/NetworkMessage.java
index 50b4975..c3f0d75 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/message/NetworkMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/message/NetworkMessage.java
@@ -20,6 +20,9 @@ package org.apache.ignite.network.message;
* Message for exchange information in cluster.
*/
public interface NetworkMessage {
+ /** Size of the direct type. */
+ static final int DIRECT_TYPE_SIZE = 2;
+
/**
* @return Message type.
*/
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageCodec.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageCodec.java
deleted file mode 100644
index a20862a..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageCodec.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network.scalecube;
-
-import io.scalecube.cluster.transport.api.Message;
-import io.scalecube.cluster.transport.api.MessageCodec;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.network.message.MessageDeserializer;
-import org.apache.ignite.network.message.MessageMapperProvider;
-import org.apache.ignite.network.message.MessageMappingException;
-import org.apache.ignite.network.message.MessageSerializer;
-import org.apache.ignite.network.message.NetworkMessage;
-
-/**
- * Serializes and deserialized messages in ScaleCube cluster.
- */
-class ScaleCubeMessageCodec implements MessageCodec {
- /** Header name for {@link NetworkMessage#directType()}. */
- public static final String HEADER_MESSAGE_TYPE = "type";
-
- /** Message mappers, messageMapperProviders[message type] -> message mapper provider for message with message type. */
- private final List<MessageMapperProvider<?>> messageMappers;
-
- /**
- * Constructor.
- * @param map Message mapper map.
- */
- ScaleCubeMessageCodec(List<MessageMapperProvider<?>> mappers) {
- messageMappers = mappers;
- }
-
- /** {@inheritDoc} */
- @Override public Message deserialize(InputStream stream) throws Exception {
- Message.Builder builder = Message.builder();
- try (ObjectInputStream ois = new ObjectInputStream(stream)) {
- // headers
- int headersSize = ois.readInt();
- Map<String, String> headers = new HashMap<>(headersSize);
- for (int i = 0; i < headersSize; i++) {
- String name = ois.readUTF();
- String value = (String) ois.readObject();
- headers.put(name, value);
- }
-
- builder.headers(headers);
-
- String typeString = headers.get(HEADER_MESSAGE_TYPE);
-
- if (typeString == null) {
- builder.data(ois.readObject());
- return builder.build();
- }
-
- short type;
- try {
- type = Short.parseShort(typeString);
- }
- catch (NumberFormatException e) {
- throw new MessageMappingException("Type is not short", e);
- }
-
- MessageMapperProvider mapperProvider = messageMappers.get(type);
-
- assert mapperProvider != null : "No mapper provider defined for type " + type;
-
- MessageDeserializer deserializer = mapperProvider.createDeserializer();
-
- NetworkMessage message = deserializer.readMessage(new ScaleCubeMessageReader(ois));
-
- builder.data(message);
- }
- return builder.build();
- }
-
- /** {@inheritDoc} */
- @Override public void serialize(Message message, OutputStream stream) throws Exception {
- final Object data = message.data();
-
- if (!(data instanceof NetworkMessage)) {
- try (ObjectOutputStream oos = new ObjectOutputStream(stream)) {
- message.writeExternal(oos);
- }
- return;
- }
-
- Map<String, String> headers = message.headers();
-
- assert headers.containsKey(HEADER_MESSAGE_TYPE) : "Missing message type header";
-
- try (ObjectOutputStream oos = new ObjectOutputStream(stream)) {
- // headers
- oos.writeInt(headers.size());
- for (Map.Entry<String, String> header : headers.entrySet()) {
- oos.writeUTF(header.getKey());
- oos.writeObject(header.getValue());
- }
-
- assert data instanceof NetworkMessage : "Message data is not an instance of NetworkMessage";
-
- NetworkMessage msg = (NetworkMessage) data;
- MessageMapperProvider mapper = messageMappers.get(msg.directType());
-
- assert mapper != null : "No mapper provider defined for type " + msg.getClass();
-
- MessageSerializer serializer = mapper.createSerializer();
-
- serializer.writeMessage(msg, new ScaleCubeMessageWriter(oos));
-
- oos.flush();
- }
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageWriter.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageWriter.java
deleted file mode 100644
index c0a773f..0000000
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageWriter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network.scalecube;
-
-import java.io.ObjectOutputStream;
-import org.apache.ignite.network.internal.MessageWriter;
-
-/** */
-@Deprecated
-public class ScaleCubeMessageWriter implements MessageWriter {
- /** */
- private final ObjectOutputStream stream;
-
- /** */
- public ScaleCubeMessageWriter(ObjectOutputStream stream) {
- this.stream = stream;
- }
-
- /** {@inheritDoc} */
- @Override public ObjectOutputStream stream() {
- return stream;
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
index f176dc2..23ae537 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java
@@ -28,12 +28,11 @@ import org.apache.ignite.network.NetworkCluster;
import org.apache.ignite.network.NetworkClusterEventHandler;
import org.apache.ignite.network.NetworkHandlersProvider;
import org.apache.ignite.network.NetworkMember;
-import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.network.NetworkMessageHandler;
+import org.apache.ignite.network.message.NetworkMessage;
import static io.scalecube.cluster.transport.api.Message.fromData;
import static java.time.Duration.ofMillis;
-import static org.apache.ignite.network.scalecube.ScaleCubeMessageCodec.HEADER_MESSAGE_TYPE;
/**
* Implementation of {@link NetworkCluster} based on ScaleCube.
@@ -95,15 +94,14 @@ public class ScaleCubeNetworkCluster implements NetworkCluster {
@Override public Future<?> send(NetworkMember member, NetworkMessage msg, String corellationId) {
return cluster.send(memberResolver.resolveMember(member),
- Message.withData(msg).header(HEADER_MESSAGE_TYPE, String.valueOf(msg.directType())).
- correlationId(corellationId).build()).toFuture();
+ Message.withData(msg).correlationId(corellationId).build()
+ ).toFuture();
}
/** {@inheritDoc} */
@Override public CompletableFuture<NetworkMessage> invoke(NetworkMember member, NetworkMessage msg, long timeout) {
return cluster.requestResponse(memberResolver.resolveMember(member),
- Message.withData(msg).correlationId(UUID.randomUUID().toString()).
- header(HEADER_MESSAGE_TYPE, String.valueOf(msg.directType())).build())
+ Message.withData(msg).correlationId(UUID.randomUUID().toString()).build())
.timeout(ofMillis(timeout)).toFuture().thenApply(m -> m.data());
}
@@ -128,7 +126,6 @@ public class ScaleCubeNetworkCluster implements NetworkCluster {
private Message fromNetworkMessage(NetworkMessage message) {
return Message.builder()
.data(message)
- .header(HEADER_MESSAGE_TYPE, String.valueOf(message.directType()))
.build();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkClusterFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkClusterFactory.java
index 479e527..87fdeff 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkClusterFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkClusterFactory.java
@@ -89,8 +89,7 @@ public class ScaleCubeNetworkClusterFactory implements NetworkClusterFactory {
.config(opts -> opts
.memberAlias(localMemberName)
.transport(trans -> {
- return trans.port(localPort)
- .messageCodec(new ScaleCubeMessageCodec(clusterContext.messageMapperProviders()));
+ return trans.port(localPort);
})
)
.membership(opts -> {
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
index 1378f0f..96eb52b 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.raft.server;
import java.util.List;
import java.util.Map;
import org.apache.ignite.lang.LogWrapper;
-import org.apache.ignite.network.message.DefaultMessageMapperProvider;
import org.apache.ignite.network.Network;
import org.apache.ignite.network.NetworkCluster;
import org.apache.ignite.network.scalecube.ScaleCubeMemberResolver;
@@ -151,11 +150,12 @@ class ITRaftCounterServerTest {
new ScaleCubeNetworkClusterFactory(name, port, servers, new ScaleCubeMemberResolver())
);
- network.registerMessageMapper((short)1000, new DefaultMessageMapperProvider());
- network.registerMessageMapper((short)1001, new DefaultMessageMapperProvider());
- network.registerMessageMapper((short)1005, new DefaultMessageMapperProvider());
- network.registerMessageMapper((short)1006, new DefaultMessageMapperProvider());
- network.registerMessageMapper((short)1009, new DefaultMessageMapperProvider());
+ // TODO: IGNITE-14088: Uncomment and use real serializer provider
+// network.registerMessageMapper((short)1000, new DefaultMessageMapperProvider());
+// network.registerMessageMapper((short)1001, new DefaultMessageMapperProvider());
+// network.registerMessageMapper((short)1005, new DefaultMessageMapperProvider());
+// network.registerMessageMapper((short)1006, new DefaultMessageMapperProvider());
+// network.registerMessageMapper((short)1009, new DefaultMessageMapperProvider());
return network.start();
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
index 45c4283..c97fb29 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import org.apache.ignite.lang.LogWrapper;
-import org.apache.ignite.network.message.DefaultMessageMapperProvider;
import org.apache.ignite.network.Network;
import org.apache.ignite.network.NetworkCluster;
import org.apache.ignite.network.NetworkHandlersProvider;
@@ -117,11 +116,12 @@ public class RaftServerImpl implements RaftServer {
new ScaleCubeNetworkClusterFactory(id, localPort, List.of(), new ScaleCubeMemberResolver())
);
- network.registerMessageMapper((short)1000, new DefaultMessageMapperProvider());
- network.registerMessageMapper((short)1001, new DefaultMessageMapperProvider());
- network.registerMessageMapper((short)1005, new DefaultMessageMapperProvider());
- network.registerMessageMapper((short)1006, new DefaultMessageMapperProvider());
- network.registerMessageMapper((short)1009, new DefaultMessageMapperProvider());
+ // TODO: IGNITE-14088: Uncomment and use real serializer provider
+// network.registerMessageMapper((short)1000, new DefaultMessageMapperProvider());
+// network.registerMessageMapper((short)1001, new DefaultMessageMapperProvider());
+// network.registerMessageMapper((short)1005, new DefaultMessageMapperProvider());
+// network.registerMessageMapper((short)1006, new DefaultMessageMapperProvider());
+// network.registerMessageMapper((short)1009, new DefaultMessageMapperProvider());
server = network.start();