You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/11/19 05:33:00 UTC
[48/51] ignite git commit: Direct marshalling backward compatibility
Direct marshalling backward compatibility
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00986d45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00986d45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00986d45
Branch: refs/heads/ignite-direct-marsh-opt
Commit: 00986d459494658b33e820064569d61a6dc5f9d3
Parents: e691188
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Nov 18 20:10:56 2015 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Nov 18 20:10:56 2015 -0800
----------------------------------------------------------------------
.../internal/direct/DirectByteBufferStream.java | 1700 ------------------
.../internal/direct/DirectMessageReader.java | 6 +-
.../direct/DirectMessageReaderState.java | 32 +-
.../internal/direct/DirectMessageWriter.java | 33 +-
.../direct/stream/DirectByteBufferStream.java | 316 ++++
.../stream/v1/DirectByteBufferStreamImplV1.java | 1347 ++++++++++++++
.../stream/v2/DirectByteBufferStreamImplV2.java | 1580 ++++++++++++++++
.../managers/communication/GridIoManager.java | 39 +-
.../testframework/GridSpiTestContext.java | 5 +-
9 files changed, 3341 insertions(+), 1717 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
deleted file mode 100644
index c55eaac..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ /dev/null
@@ -1,1700 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.direct;
-
-import java.lang.reflect.Array;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.RandomAccess;
-import java.util.UUID;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-/**
- * Portable stream based on {@link ByteBuffer}.
- */
-public class DirectByteBufferStream {
- /** */
- private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
- /** */
- private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
- /** */
- private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
-
- /** */
- private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
-
- /** */
- private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
-
- /** */
- private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
-
- /** */
- private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
-
- /** */
- private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
-
- /** */
- private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
-
- /** */
- private static final byte[] BYTE_ARR_EMPTY = new byte[0];
-
- /** */
- private static final short[] SHORT_ARR_EMPTY = new short[0];
-
- /** */
- private static final int[] INT_ARR_EMPTY = U.EMPTY_INTS;
-
- /** */
- private static final long[] LONG_ARR_EMPTY = U.EMPTY_LONGS;
-
- /** */
- private static final float[] FLOAT_ARR_EMPTY = new float[0];
-
- /** */
- private static final double[] DOUBLE_ARR_EMPTY = new double[0];
-
- /** */
- private static final char[] CHAR_ARR_EMPTY = new char[0];
-
- /** */
- private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0];
-
- /** */
- private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() {
- @Override public byte[] create(int len) {
- assert len >= 0;
-
- switch (len) {
- case 0:
- return BYTE_ARR_EMPTY;
-
- default:
- return new byte[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() {
- @Override public short[] create(int len) {
- assert len >= 0;
-
- switch (len) {
- case 0:
- return SHORT_ARR_EMPTY;
-
- default:
- return new short[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() {
- @Override public int[] create(int len) {
- assert len >= 0;
-
- switch (len) {
- case 0:
- return INT_ARR_EMPTY;
-
- default:
- return new int[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() {
- @Override public long[] create(int len) {
- assert len >= 0;
-
- switch (len) {
- case 0:
- return LONG_ARR_EMPTY;
-
- default:
- return new long[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() {
- @Override public float[] create(int len) {
- assert len >= 0;
-
- switch (len) {
- case 0:
- return FLOAT_ARR_EMPTY;
-
- default:
- return new float[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() {
- @Override public double[] create(int len) {
- assert len >= 0;
-
- switch (len) {
- case 0:
- return DOUBLE_ARR_EMPTY;
-
- default:
- return new double[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() {
- @Override public char[] create(int len) {
- assert len >= 0;
-
- switch (len) {
- case 0:
- return CHAR_ARR_EMPTY;
-
- default:
- return new char[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() {
- @Override public boolean[] create(int len) {
- assert len >= 0;
-
- switch (len) {
- case 0:
- return BOOLEAN_ARR_EMPTY;
-
- default:
- return new boolean[len];
- }
- }
- };
-
- /** */
- private static final Object NULL = new Object();
-
- /** */
- private final MessageFactory msgFactory;
-
- /** */
- private ByteBuffer buf;
-
- /** */
- private byte[] heapArr;
-
- /** */
- private long baseOff;
-
- /** */
- private int arrOff = -1;
-
- /** */
- private Object tmpArr;
-
- /** */
- private int tmpArrOff;
-
- /** */
- private int tmpArrBytes;
-
- /** */
- private boolean msgTypeDone;
-
- /** */
- private Message msg;
-
- /** */
- private Iterator<?> mapIt;
-
- /** */
- private Iterator<?> it;
-
- /** */
- private int arrPos = -1;
-
- /** */
- private Object arrCur = NULL;
-
- /** */
- private Object mapCur = NULL;
-
- /** */
- private Object cur = NULL;
-
- /** */
- private boolean keyDone;
-
- /** */
- private int readSize = -1;
-
- /** */
- private int readItems;
-
- /** */
- private Object[] objArr;
-
- /** */
- private Collection<Object> col;
-
- /** */
- private Map<Object, Object> map;
-
- /** */
- private long prim;
-
- /** */
- private int primShift;
-
- /** */
- private int uuidState;
-
- /** */
- private long uuidMost;
-
- /** */
- private long uuidLeast;
-
- /** */
- private long uuidLocId;
-
- /** */
- private boolean lastFinished;
-
- /**
- * @param msgFactory Message factory.
- */
- public DirectByteBufferStream(MessageFactory msgFactory) {
- this.msgFactory = msgFactory;
- }
-
- /**
- * @param buf Buffer.
- */
- public void setBuffer(ByteBuffer buf) {
- assert buf != null;
-
- if (this.buf != buf) {
- this.buf = buf;
-
- heapArr = buf.isDirect() ? null : buf.array();
- baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF;
- }
- }
-
- /**
- * @return Number of remaining bytes.
- */
- public int remaining() {
- return buf.remaining();
- }
-
- /**
- * @return Whether last object was fully written or read.
- */
- public boolean lastFinished() {
- return lastFinished;
- }
-
- /**
- * @param val Value.
- */
- public void writeByte(byte val) {
- lastFinished = buf.remaining() >= 1;
-
- if (lastFinished) {
- int pos = buf.position();
-
- UNSAFE.putByte(heapArr, baseOff + pos, val);
-
- buf.position(pos + 1);
- }
- }
-
- /**
- * @param val Value.
- */
- public void writeShort(short val) {
- lastFinished = buf.remaining() >= 2;
-
- if (lastFinished) {
- int pos = buf.position();
-
- UNSAFE.putShort(heapArr, baseOff + pos, val);
-
- buf.position(pos + 2);
- }
- }
-
- /**
- * @param val Value.
- */
- public void writeInt(int val) {
- lastFinished = buf.remaining() >= 5;
-
- lastFinished = buf.remaining() >= 5;
-
- if (lastFinished) {
- if (val == Integer.MAX_VALUE)
- val = Integer.MIN_VALUE;
- else
- val++;
-
- int pos = buf.position();
-
- while ((val & 0xFFFF_FF80) != 0) {
- byte b = (byte)(val | 0x80);
-
- UNSAFE.putByte(heapArr, baseOff + pos++, b);
-
- val >>>= 7;
- }
-
- UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val);
-
- buf.position(pos);
- }
- }
-
- /**
- * @param val Value.
- */
- public void writeLong(long val) {
- lastFinished = buf.remaining() >= 10;
-
- if (lastFinished) {
- if (val == Long.MAX_VALUE)
- val = Long.MIN_VALUE;
- else
- val++;
-
- int pos = buf.position();
-
- while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
- byte b = (byte)(val | 0x80);
-
- UNSAFE.putByte(heapArr, baseOff + pos++, b);
-
- val >>>= 7;
- }
-
- UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val);
-
- buf.position(pos);
- }
- }
-
- /**
- * @param val Value.
- */
- public void writeFloat(float val) {
- lastFinished = buf.remaining() >= 4;
-
- if (lastFinished) {
- int pos = buf.position();
-
- UNSAFE.putFloat(heapArr, baseOff + pos, val);
-
- buf.position(pos + 4);
- }
- }
-
- /**
- * @param val Value.
- */
- public void writeDouble(double val) {
- lastFinished = buf.remaining() >= 8;
-
- if (lastFinished) {
- int pos = buf.position();
-
- UNSAFE.putDouble(heapArr, baseOff + pos, val);
-
- buf.position(pos + 8);
- }
- }
-
- /**
- * @param val Value.
- */
- public void writeChar(char val) {
- lastFinished = buf.remaining() >= 2;
-
- if (lastFinished) {
- int pos = buf.position();
-
- UNSAFE.putChar(heapArr, baseOff + pos, val);
-
- buf.position(pos + 2);
- }
- }
-
- /**
- * @param val Value.
- */
- public void writeBoolean(boolean val) {
- lastFinished = buf.remaining() >= 1;
-
- if (lastFinished) {
- int pos = buf.position();
-
- UNSAFE.putBoolean(heapArr, baseOff + pos, val);
-
- buf.position(pos + 1);
- }
- }
-
- /**
- * @param val Value.
- */
- public void writeByteArray(byte[] val) {
- if (val != null)
- lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length);
- else
- writeInt(-1);
- }
-
- /**
- * @param val Value.
- * @param off Offset.
- * @param len Length.
- */
- public void writeByteArray(byte[] val, long off, int len) {
- if (val != null)
- lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len);
- else
- writeInt(-1);
- }
-
- /**
- * @param val Value.
- */
- public void writeShortArray(short[] val) {
- if (val != null)
- lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1);
- else
- writeInt(-1);
- }
-
- /**
- * @param val Value.
- */
- public void writeIntArray(int[] val) {
- if (val != null)
- lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2);
- else
- writeInt(-1);
- }
-
- /**
- * @param val Value.
- */
- public void writeLongArray(long[] val) {
- if (val != null)
- lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3);
- else
- writeInt(-1);
- }
-
- /**
- * @param val Value.
- */
- public void writeFloatArray(float[] val) {
- if (val != null)
- lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2);
- else
- writeInt(-1);
- }
-
- /**
- * @param val Value.
- */
- public void writeDoubleArray(double[] val) {
- if (val != null)
- lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3);
- else
- writeInt(-1);
- }
-
- /**
- * @param val Value.
- */
- public void writeCharArray(char[] val) {
- if (val != null)
- lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1);
- else
- writeInt(-1);
- }
-
- /**
- * @param val Value.
- */
- public void writeBooleanArray(boolean[] val) {
- if (val != null)
- lastFinished = writeArray(val, BOOLEAN_ARR_OFF, val.length, val.length);
- else
- writeInt(-1);
- }
-
- /**
- * @param val Value.
- */
- public void writeString(String val) {
- writeByteArray(val != null ? val.getBytes() : null);
- }
-
- /**
- * @param val Value.
- */
- public void writeBitSet(BitSet val) {
- writeLongArray(val != null ? val.toLongArray() : null);
- }
-
- /**
- * @param val Value.
- */
- public void writeUuid(UUID val) {
- switch (uuidState) {
- case 0:
- writeBoolean(val == null);
-
- if (!lastFinished || val == null)
- return;
-
- uuidState++;
-
- case 1:
- writeLong(val.getMostSignificantBits());
-
- if (!lastFinished)
- return;
-
- uuidState++;
-
- case 2:
- writeLong(val.getLeastSignificantBits());
-
- if (!lastFinished)
- return;
-
- uuidState = 0;
- }
- }
-
- /**
- * @param val Value.
- */
- public void writeIgniteUuid(IgniteUuid val) {
- switch (uuidState) {
- case 0:
- writeBoolean(val == null);
-
- if (!lastFinished || val == null)
- return;
-
- uuidState++;
-
- case 1:
- writeLong(val.globalId().getMostSignificantBits());
-
- if (!lastFinished)
- return;
-
- uuidState++;
-
- case 2:
- writeLong(val.globalId().getLeastSignificantBits());
-
- if (!lastFinished)
- return;
-
- uuidState++;
-
- case 3:
- writeLong(val.localId());
-
- if (!lastFinished)
- return;
-
- uuidState = 0;
- }
- }
-
- /**
- * @param msg Message.
- */
- public void writeMessage(Message msg, MessageWriter writer) {
- if (msg != null) {
- if (buf.hasRemaining()) {
- try {
- writer.beforeInnerMessageWrite();
-
- lastFinished = msg.writeTo(buf, writer);
- }
- finally {
- writer.afterInnerMessageWrite(lastFinished);
- }
- }
- else
- lastFinished = false;
- }
- else
- writeByte(Byte.MIN_VALUE);
- }
-
- /**
- * @param arr Array.
- * @param itemType Component type.
- * @param writer Writer.
- */
- public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer) {
- if (arr != null) {
- int len = arr.length;
-
- if (arrPos == -1) {
- writeInt(len);
-
- if (!lastFinished)
- return;
-
- arrPos = 0;
- }
-
- while (arrPos < len || arrCur != NULL) {
- if (arrCur == NULL)
- arrCur = arr[arrPos++];
-
- write(itemType, arrCur, writer);
-
- if (!lastFinished)
- return;
-
- arrCur = NULL;
- }
-
- arrPos = -1;
- }
- else
- writeInt(-1);
- }
-
- /**
- * @param col Collection.
- * @param itemType Item type.
- * @param writer Writer.
- */
- public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, MessageWriter writer) {
- if (col != null) {
- if (it == null) {
- writeInt(col.size());
-
- if (!lastFinished)
- return;
-
- it = col.iterator();
- }
-
- while (it.hasNext() || cur != NULL) {
- if (cur == NULL)
- cur = it.next();
-
- write(itemType, cur, writer);
-
- if (!lastFinished)
- return;
-
- cur = NULL;
- }
-
- it = null;
- }
- else
- writeInt(-1);
- }
-
- /**
- * @param list List.
- * @param itemType Component type.
- * @param writer Writer.
- */
- public <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType itemType, MessageWriter writer) {
- if (list != null) {
- assert list instanceof RandomAccess;
-
- int size = list.size();
-
- if (arrPos == -1) {
- writeInt(size);
-
- if (!lastFinished)
- return;
-
- arrPos = 0;
- }
-
- while (arrPos < size || arrCur != NULL) {
- if (arrCur == NULL)
- arrCur = list.get(arrPos++);
-
- write(itemType, arrCur, writer);
-
- if (!lastFinished)
- return;
-
- arrCur = NULL;
- }
-
- arrPos = -1;
- }
- else
- writeInt(-1);
- }
-
- /**
- * @param map Map.
- * @param keyType Key type.
- * @param valType Value type.
- * @param writer Writer.
- */
- @SuppressWarnings("unchecked")
- public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, MessageCollectionItemType valType,
- MessageWriter writer) {
- if (map != null) {
- if (mapIt == null) {
- writeInt(map.size());
-
- if (!lastFinished)
- return;
-
- mapIt = map.entrySet().iterator();
- }
-
- while (mapIt.hasNext() || mapCur != NULL) {
- Map.Entry<K, V> e;
-
- if (mapCur == NULL)
- mapCur = mapIt.next();
-
- e = (Map.Entry<K, V>)mapCur;
-
- if (!keyDone) {
- write(keyType, e.getKey(), writer);
-
- if (!lastFinished)
- return;
-
- keyDone = true;
- }
-
- write(valType, e.getValue(), writer);
-
- if (!lastFinished)
- return;
-
- mapCur = NULL;
- keyDone = false;
- }
-
- mapIt = null;
- }
- else
- writeInt(-1);
- }
-
- /**
- * @return Value.
- */
- public byte readByte() {
- lastFinished = buf.remaining() >= 1;
-
- if (lastFinished) {
- int pos = buf.position();
-
- buf.position(pos + 1);
-
- return UNSAFE.getByte(heapArr, baseOff + pos);
- }
- else
- return 0;
- }
-
- /**
- * @return Value.
- */
- public short readShort() {
- lastFinished = buf.remaining() >= 2;
-
- if (lastFinished) {
- int pos = buf.position();
-
- buf.position(pos + 2);
-
- return UNSAFE.getShort(heapArr, baseOff + pos);
- }
- else
- return 0;
- }
-
- /**
- * @return Value.
- */
- public int readInt() {
- lastFinished = false;
-
- int val = 0;
-
- while (buf.hasRemaining()) {
- int pos = buf.position();
-
- byte b = UNSAFE.getByte(heapArr, baseOff + pos);
-
- buf.position(pos + 1);
-
- prim |= ((long)b & 0x7F) << (7 * primShift);
-
- if ((b & 0x80) == 0) {
- lastFinished = true;
-
- val = (int)prim;
-
- if (val == Integer.MIN_VALUE)
- val = Integer.MAX_VALUE;
- else
- val--;
-
- prim = 0;
- primShift = 0;
-
- break;
- }
- else
- primShift++;
- }
-
- return val;
- }
-
- /**
- * @return Value.
- */
- public long readLong() {
- lastFinished = false;
-
- long val = 0;
-
- while (buf.hasRemaining()) {
- int pos = buf.position();
-
- byte b = UNSAFE.getByte(heapArr, baseOff + pos);
-
- buf.position(pos + 1);
-
- prim |= ((long)b & 0x7F) << (7 * primShift);
-
- if ((b & 0x80) == 0) {
- lastFinished = true;
-
- val = prim;
-
- if (val == Long.MIN_VALUE)
- val = Long.MAX_VALUE;
- else
- val--;
-
- prim = 0;
- primShift = 0;
-
- break;
- }
- else
- primShift++;
- }
-
- return val;
- }
-
- /**
- * @return Value.
- */
- public float readFloat() {
- lastFinished = buf.remaining() >= 4;
-
- if (lastFinished) {
- int pos = buf.position();
-
- buf.position(pos + 4);
-
- return UNSAFE.getFloat(heapArr, baseOff + pos);
- }
- else
- return 0;
- }
-
- /**
- * @return Value.
- */
- public double readDouble() {
- lastFinished = buf.remaining() >= 8;
-
- if (lastFinished) {
- int pos = buf.position();
-
- buf.position(pos + 8);
-
- return UNSAFE.getDouble(heapArr, baseOff + pos);
- }
- else
- return 0;
- }
-
- /**
- * @return Value.
- */
- public char readChar() {
- lastFinished = buf.remaining() >= 2;
-
- if (lastFinished) {
- int pos = buf.position();
-
- buf.position(pos + 2);
-
- return UNSAFE.getChar(heapArr, baseOff + pos);
- }
- else
- return 0;
- }
-
- /**
- * @return Value.
- */
- public boolean readBoolean() {
- lastFinished = buf.hasRemaining();
-
- if (lastFinished) {
- int pos = buf.position();
-
- buf.position(pos + 1);
-
- return UNSAFE.getBoolean(heapArr, baseOff + pos);
- }
- else
- return false;
- }
-
- /**
- * @return Value.
- */
- public byte[] readByteArray() {
- return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF);
- }
-
- /**
- /**
- * @return Value.
- */
- public short[] readShortArray() {
- return readArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF);
- }
-
- /**
- * @return Value.
- */
- public int[] readIntArray() {
- return readArray(INT_ARR_CREATOR, 2, INT_ARR_OFF);
- }
-
- /**
- * @return Value.
- */
- public long[] readLongArray() {
- return readArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF);
- }
-
- /**
- * @return Value.
- */
- public float[] readFloatArray() {
- return readArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF);
- }
-
- /**
- * @return Value.
- */
- public double[] readDoubleArray() {
- return readArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF);
- }
-
- /**
- * @return Value.
- */
- public char[] readCharArray() {
- return readArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF);
- }
-
- /**
- * @return Value.
- */
- public boolean[] readBooleanArray() {
- return readArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF);
- }
-
- /**
- * @return Value.
- */
- public String readString() {
- byte[] arr = readByteArray();
-
- return arr != null ? new String(arr) : null;
- }
-
- /**
- * @return Value.
- */
- public BitSet readBitSet() {
- long[] arr = readLongArray();
-
- return arr != null ? BitSet.valueOf(arr) : null;
- }
-
- /**
- * @return Value.
- */
- public UUID readUuid() {
- switch (uuidState) {
- case 0:
- boolean isNull = readBoolean();
-
- if (!lastFinished || isNull)
- return null;
-
- uuidState++;
-
- case 1:
- uuidMost = readLong();
-
- if (!lastFinished)
- return null;
-
- uuidState++;
-
- case 2:
- uuidLeast = readLong();
-
- if (!lastFinished)
- return null;
-
- uuidState = 0;
- }
-
- UUID val = new UUID(uuidMost, uuidLeast);
-
- uuidMost = 0;
- uuidLeast = 0;
-
- return val;
- }
-
- /**
- * @return Value.
- */
- public IgniteUuid readIgniteUuid() {
- switch (uuidState) {
- case 0:
- boolean isNull = readBoolean();
-
- if (!lastFinished || isNull)
- return null;
-
- uuidState++;
-
- case 1:
- uuidMost = readLong();
-
- if (!lastFinished)
- return null;
-
- uuidState++;
-
- case 2:
- uuidLeast = readLong();
-
- if (!lastFinished)
- return null;
-
- uuidState++;
-
- case 3:
- uuidLocId = readLong();
-
- if (!lastFinished)
- return null;
-
- uuidState = 0;
- }
-
- IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), uuidLocId);
-
- uuidMost = 0;
- uuidLeast = 0;
-
- return val;
- }
-
- /**
- * @return Message.
- */
- @SuppressWarnings("unchecked")
- public <T extends Message> T readMessage(MessageReader reader) {
- if (!msgTypeDone) {
- if (!buf.hasRemaining()) {
- lastFinished = false;
-
- return null;
- }
-
- byte type = readByte();
-
- msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type);
-
- msgTypeDone = true;
- }
-
- if (msg != null) {
- try {
- reader.beforeInnerMessageRead();
-
- lastFinished = msg.readFrom(buf, reader);
- }
- finally {
- reader.afterInnerMessageRead(lastFinished);
- }
- }
- else
- lastFinished = true;
-
- if (lastFinished) {
- Message msg0 = msg;
-
- msgTypeDone = false;
- msg = null;
-
- return (T)msg0;
- }
- else
- return null;
- }
-
- /**
- * @param itemType Component type.
- * @param itemCls Component class.
- * @param reader Reader.
- * @return Array.
- */
- @SuppressWarnings("unchecked")
- public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, MessageReader reader) {
- if (readSize == -1) {
- int size = readInt();
-
- if (!lastFinished)
- return null;
-
- readSize = size;
- }
-
- if (readSize >= 0) {
- if (objArr == null)
- objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize];
-
- for (int i = readItems; i < readSize; i++) {
- Object item = read(itemType, reader);
-
- if (!lastFinished)
- return null;
-
- objArr[i] = item;
-
- readItems++;
- }
- }
-
- readSize = -1;
- readItems = 0;
- cur = null;
-
- T[] objArr0 = (T[])objArr;
-
- objArr = null;
-
- return objArr0;
- }
-
- /**
- * @param itemType Item type.
- * @param reader Reader.
- * @return Collection.
- */
- @SuppressWarnings("unchecked")
- public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader) {
- if (readSize == -1) {
- int size = readInt();
-
- if (!lastFinished)
- return null;
-
- readSize = size;
- }
-
- if (readSize >= 0) {
- if (col == null)
- col = new ArrayList<>(readSize);
-
- for (int i = readItems; i < readSize; i++) {
- Object item = read(itemType, reader);
-
- if (!lastFinished)
- return null;
-
- col.add(item);
-
- readItems++;
- }
- }
-
- readSize = -1;
- readItems = 0;
- cur = null;
-
- C col0 = (C)col;
-
- col = null;
-
- return col0;
- }
-
- /**
- * @param keyType Key type.
- * @param valType Value type.
- * @param linked Whether linked map should be created.
- * @param reader Reader.
- * @return Map.
- */
- @SuppressWarnings("unchecked")
- public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType,
- boolean linked, MessageReader reader) {
- if (readSize == -1) {
- int size = readInt();
-
- if (!lastFinished)
- return null;
-
- readSize = size;
- }
-
- if (readSize >= 0) {
- if (map == null)
- map = linked ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize);
-
- for (int i = readItems; i < readSize; i++) {
- if (!keyDone) {
- Object key = read(keyType, reader);
-
- if (!lastFinished)
- return null;
-
- mapCur = key;
- keyDone = true;
- }
-
- Object val = read(valType, reader);
-
- if (!lastFinished)
- return null;
-
- map.put(mapCur, val);
-
- keyDone = false;
-
- readItems++;
- }
- }
-
- readSize = -1;
- readItems = 0;
- mapCur = null;
-
- M map0 = (M)map;
-
- map = null;
-
- return map0;
- }
-
- /**
- * @param arr Array.
- * @param off Offset.
- * @param len Length.
- * @param bytes Length in bytes.
- * @return Whether array was fully written.
- */
- private boolean writeArray(Object arr, long off, int len, int bytes) {
- assert arr != null;
- assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
- assert off > 0;
- assert len >= 0;
- assert bytes >= 0;
- assert bytes >= arrOff;
-
- if (arrOff == -1) {
- writeInt(len);
-
- if (!lastFinished)
- return false;
-
- arrOff = 0;
- }
-
- int toWrite = bytes - arrOff;
- int pos = buf.position();
- int remaining = buf.remaining();
-
- if (toWrite <= remaining) {
- if (toWrite > 0) {
- UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
-
- buf.position(pos + toWrite);
- }
-
- arrOff = -1;
-
- return true;
- }
- else {
- if (remaining > 0) {
- UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
-
- buf.position(pos + remaining);
-
- arrOff += remaining;
- }
-
- return false;
- }
- }
-
- /**
- * @param creator Array creator.
- * @param lenShift Array length shift size.
- * @param off Base offset.
- * @return Array or special value if it was not fully read.
- */
- @SuppressWarnings("unchecked")
- private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) {
- assert creator != null;
-
- if (tmpArr == null) {
- int len = readInt();
-
- if (!lastFinished)
- return null;
-
- switch (len) {
- case -1:
- lastFinished = true;
-
- return null;
-
- case 0:
- lastFinished = true;
-
- return creator.create(0);
-
- default:
- tmpArr = creator.create(len);
- tmpArrBytes = len << lenShift;
- }
- }
-
- int toRead = tmpArrBytes - tmpArrOff;
- int remaining = buf.remaining();
- int pos = buf.position();
-
- lastFinished = toRead <= remaining;
-
- if (lastFinished) {
- UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
-
- buf.position(pos + toRead);
-
- T arr = (T)tmpArr;
-
- tmpArr = null;
- tmpArrBytes = 0;
- tmpArrOff = 0;
-
- return arr;
- }
- else {
- UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
-
- buf.position(pos + remaining);
-
- tmpArrOff += remaining;
-
- return null;
- }
- }
-
- /**
- * @param type Type.
- * @param val Value.
- * @param writer Writer.
- */
- private void write(MessageCollectionItemType type, Object val, MessageWriter writer) {
- switch (type) {
- case BYTE:
- writeByte((Byte)val);
-
- break;
-
- case SHORT:
- writeShort((Short)val);
-
- break;
-
- case INT:
- writeInt((Integer)val);
-
- break;
-
- case LONG:
- writeLong((Long)val);
-
- break;
-
- case FLOAT:
- writeFloat((Float)val);
-
- break;
-
- case DOUBLE:
- writeDouble((Double)val);
-
- break;
-
- case CHAR:
- writeChar((Character)val);
-
- break;
-
- case BOOLEAN:
- writeBoolean((Boolean)val);
-
- break;
-
- case BYTE_ARR:
- writeByteArray((byte[])val);
-
- break;
-
- case SHORT_ARR:
- writeShortArray((short[])val);
-
- break;
-
- case INT_ARR:
- writeIntArray((int[])val);
-
- break;
-
- case LONG_ARR:
- writeLongArray((long[])val);
-
- break;
-
- case FLOAT_ARR:
- writeFloatArray((float[])val);
-
- break;
-
- case DOUBLE_ARR:
- writeDoubleArray((double[])val);
-
- break;
-
- case CHAR_ARR:
- writeCharArray((char[])val);
-
- break;
-
- case BOOLEAN_ARR:
- writeBooleanArray((boolean[])val);
-
- break;
-
- case STRING:
- writeString((String)val);
-
- break;
-
- case BIT_SET:
- writeBitSet((BitSet)val);
-
- break;
-
- case UUID:
- writeUuid((UUID)val);
-
- break;
-
- case IGNITE_UUID:
- writeIgniteUuid((IgniteUuid)val);
-
- break;
-
- case MSG:
- try {
- if (val != null)
- writer.beforeInnerMessageWrite();
-
- writeMessage((Message)val, writer);
- }
- finally {
- if (val != null)
- writer.afterInnerMessageWrite(lastFinished);
- }
-
- break;
-
- default:
- throw new IllegalArgumentException("Unknown type: " + type);
- }
- }
-
- /**
- * @param type Type.
- * @param reader Reader.
- * @return Value.
- */
- private Object read(MessageCollectionItemType type, MessageReader reader) {
- switch (type) {
- case BYTE:
- return readByte();
-
- case SHORT:
- return readShort();
-
- case INT:
- return readInt();
-
- case LONG:
- return readLong();
-
- case FLOAT:
- return readFloat();
-
- case DOUBLE:
- return readDouble();
-
- case CHAR:
- return readChar();
-
- case BOOLEAN:
- return readBoolean();
-
- case BYTE_ARR:
- return readByteArray();
-
- case SHORT_ARR:
- return readShortArray();
-
- case INT_ARR:
- return readIntArray();
-
- case LONG_ARR:
- return readLongArray();
-
- case FLOAT_ARR:
- return readFloatArray();
-
- case DOUBLE_ARR:
- return readDoubleArray();
-
- case CHAR_ARR:
- return readCharArray();
-
- case BOOLEAN_ARR:
- return readBooleanArray();
-
- case STRING:
- return readString();
-
- case BIT_SET:
- return readBitSet();
-
- case UUID:
- return readUuid();
-
- case IGNITE_UUID:
- return readIgniteUuid();
-
- case MSG:
- return readMessage(reader);
-
- default:
- throw new IllegalArgumentException("Unknown type: " + type);
- }
- }
-
- /**
- * Array creator.
- */
- private static interface ArrayCreator<T> {
- /**
- * @param len Array length or {@code -1} if array was not fully read.
- * @return New array.
- */
- public T create(int len);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index 2f91fbd..297d3e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -22,6 +22,7 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -41,9 +42,10 @@ public class DirectMessageReader implements MessageReader {
/**
* @param msgFactory Message factory.
+ * @param protoVer Protocol version.
*/
- public DirectMessageReader(MessageFactory msgFactory) {
- state = new DirectMessageReaderState(msgFactory);
+ public DirectMessageReader(MessageFactory msgFactory, byte protoVer) {
+ state = new DirectMessageReaderState(msgFactory, protoVer);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
index d423052..1b02213 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.direct;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
+import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
/**
@@ -29,6 +32,9 @@ public class DirectMessageReaderState {
/** Message factory. */
private final MessageFactory msgFactory;
+ /** Protocol version. */
+ private final byte protoVer;
+
/** Stack array. */
private StateItem[] stack;
@@ -37,13 +43,15 @@ public class DirectMessageReaderState {
/**
* @param msgFactory Message factory.
+ * @param protoVer Protocol version.
*/
- public DirectMessageReaderState(MessageFactory msgFactory) {
+ public DirectMessageReaderState(MessageFactory msgFactory, byte protoVer) {
this.msgFactory = msgFactory;
+ this.protoVer = protoVer;
stack = new StateItem[INIT_SIZE];
- stack[0] = new StateItem(msgFactory);
+ stack[0] = new StateItem(msgFactory, protoVer);
}
/**
@@ -84,7 +92,7 @@ public class DirectMessageReaderState {
}
if (stack[pos] == null)
- stack[pos] = new StateItem(msgFactory);
+ stack[pos] = new StateItem(msgFactory, protoVer);
}
/**
@@ -120,9 +128,23 @@ public class DirectMessageReaderState {
/**
* @param msgFactory Message factory.
+ * @param protoVer Protocol version.
*/
- public StateItem(MessageFactory msgFactory) {
- stream = new DirectByteBufferStream(msgFactory);
+ public StateItem(MessageFactory msgFactory, byte protoVer) {
+ switch (protoVer) {
+ case 1:
+ stream = new DirectByteBufferStreamImplV1(msgFactory);
+
+ break;
+
+ case 2:
+ stream = new DirectByteBufferStreamImplV2(msgFactory);
+
+ break;
+
+ default:
+ throw new IllegalStateException("Invalid protocol version: " + protoVer);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index 3f2866f..07a037e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -20,10 +20,11 @@ package org.apache.ignite.internal.direct;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
-import java.util.RandomAccess;
import java.util.UUID;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
+import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -35,11 +36,31 @@ import org.jetbrains.annotations.Nullable;
*/
public class DirectMessageWriter implements MessageWriter {
/** Stream. */
- private final DirectByteBufferStream stream = new DirectByteBufferStream(null);
+ private final DirectByteBufferStream stream;
/** State. */
private final DirectMessageWriterState state = new DirectMessageWriterState();
+ /**
+ * @param protoVer Protocol version.
+ */
+ public DirectMessageWriter(byte protoVer) {
+ switch (protoVer) {
+ case 1:
+ stream = new DirectByteBufferStreamImplV1(null);
+
+ break;
+
+ case 2:
+ stream = new DirectByteBufferStreamImplV2(null);
+
+ break;
+
+ default:
+ throw new IllegalStateException("Invalid protocol version: " + protoVer);
+ }
+ }
+
/** {@inheritDoc} */
@Override public void setBuffer(ByteBuffer buf) {
stream.setBuffer(buf);
@@ -215,9 +236,9 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType) {
- if (col instanceof List && col instanceof RandomAccess)
- stream.writeRandomAccessList((List<T>)col, itemType, this);
- else
+// if (col instanceof List && col instanceof RandomAccess)
+// stream.writeRandomAccessList((List<T>)col, itemType, this);
+// else
stream.writeCollection(col, itemType, this);
return stream.lastFinished();
http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
new file mode 100644
index 0000000..bc9de5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct.stream;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Direct marshalling I/O stream.
+ */
+public interface DirectByteBufferStream {
+ /**
+ * @param buf Buffer.
+ */
+ public void setBuffer(ByteBuffer buf);
+
+ /**
+ * @return Number of remaining bytes.
+ */
+ public int remaining();
+
+ /**
+ * @return Whether last object was fully written or read.
+ */
+ public boolean lastFinished();
+
+ /**
+ * @param val Value.
+ */
+ public void writeByte(byte val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeShort(short val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeInt(int val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeLong(long val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeFloat(float val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeDouble(double val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeChar(char val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeBoolean(boolean val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeByteArray(byte[] val);
+
+ /**
+ * @param val Value.
+ * @param off Offset.
+ * @param len Length.
+ */
+ public void writeByteArray(byte[] val, long off, int len);
+
+ /**
+ * @param val Value.
+ */
+ public void writeShortArray(short[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeIntArray(int[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeLongArray(long[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeFloatArray(float[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeDoubleArray(double[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeCharArray(char[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeBooleanArray(boolean[] val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeString(String val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeBitSet(BitSet val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeUuid(UUID val);
+
+ /**
+ * @param val Value.
+ */
+ public void writeIgniteUuid(IgniteUuid val);
+
+ /**
+ * @param msg Message.
+ * @param writer Writer.
+ */
+ public void writeMessage(Message msg, MessageWriter writer);
+
+ /**
+ * @param arr Array.
+ * @param itemType Component type.
+ * @param writer Writer.
+ */
+ public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer);
+
+ /**
+ * @param col Collection.
+ * @param itemType Component type.
+ * @param writer Writer.
+ */
+ public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, MessageWriter writer);
+
+ /**
+ * @param map Map.
+ * @param keyType Key type.
+ * @param valType Value type.
+ * @param writer Writer.
+ */
+ public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, MessageCollectionItemType valType,
+ MessageWriter writer);
+
+ /**
+ * @return Value.
+ */
+ public byte readByte();
+
+ /**
+ * @return Value.
+ */
+ public short readShort();
+
+ /**
+ * @return Value.
+ */
+ public int readInt();
+
+ /**
+ * @return Value.
+ */
+ public long readLong();
+
+ /**
+ * @return Value.
+ */
+ public float readFloat();
+
+ /**
+ * @return Value.
+ */
+ public double readDouble();
+
+ /**
+ * @return Value.
+ */
+ public char readChar();
+
+ /**
+ * @return Value.
+ */
+ public boolean readBoolean();
+
+ /**
+ * @return Value.
+ */
+ public byte[] readByteArray();
+
+ /**
+ * @return Value.
+ */
+ public short[] readShortArray();
+
+ /**
+ * @return Value.
+ */
+ public int[] readIntArray();
+
+ /**
+ * @return Value.
+ */
+ public long[] readLongArray();
+
+ /**
+ * @return Value.
+ */
+ public float[] readFloatArray();
+
+ /**
+ * @return Value.
+ */
+ public double[] readDoubleArray();
+
+ /**
+ * @return Value.
+ */
+ public char[] readCharArray();
+
+ /**
+ * @return Value.
+ */
+ public boolean[] readBooleanArray();
+
+ /**
+ * @return Value.
+ */
+ public String readString();
+
+ /**
+ * @return Value.
+ */
+ public BitSet readBitSet();
+
+ /**
+ * @return Value.
+ */
+ public UUID readUuid();
+
+ /**
+ * @return Value.
+ */
+ public IgniteUuid readIgniteUuid();
+
+ /**
+ * @param reader Reader.
+ * @return Message.
+ */
+ public <T extends Message> T readMessage(MessageReader reader);
+
+ /**
+ * @param itemType Item type.
+ * @param itemCls Item class.
+ * @param reader Reader.
+ * @return Array.
+ */
+ public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, MessageReader reader);
+
+ /**
+ * @param itemType Item type.
+ * @param reader Reader.
+ * @return Collection.
+ */
+ public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader);
+
+ /**
+ * @param keyType Key type.
+ * @param valType Value type.
+ * @param linked Whether linked map should be created.
+ * @param reader Reader.
+ * @return Map.
+ */
+ public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType,
+ boolean linked, MessageReader reader);
+}