You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/11/19 05:33:00 UTC

[48/51] ignite git commit: Direct marshalling backward compatibility

Direct marshalling backward compatibility


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00986d45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00986d45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00986d45

Branch: refs/heads/ignite-direct-marsh-opt
Commit: 00986d459494658b33e820064569d61a6dc5f9d3
Parents: e691188
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Nov 18 20:10:56 2015 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Nov 18 20:10:56 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectByteBufferStream.java | 1700 ------------------
 .../internal/direct/DirectMessageReader.java    |    6 +-
 .../direct/DirectMessageReaderState.java        |   32 +-
 .../internal/direct/DirectMessageWriter.java    |   33 +-
 .../direct/stream/DirectByteBufferStream.java   |  316 ++++
 .../stream/v1/DirectByteBufferStreamImplV1.java | 1347 ++++++++++++++
 .../stream/v2/DirectByteBufferStreamImplV2.java | 1580 ++++++++++++++++
 .../managers/communication/GridIoManager.java   |   39 +-
 .../testframework/GridSpiTestContext.java       |    5 +-
 9 files changed, 3341 insertions(+), 1717 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
deleted file mode 100644
index c55eaac..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ /dev/null
@@ -1,1700 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.direct;
-
-import java.lang.reflect.Array;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.RandomAccess;
-import java.util.UUID;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-/**
- * Portable stream based on {@link ByteBuffer}.
- */
-public class DirectByteBufferStream {
-    /** */
-    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
-    /** */
-    private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
-    /** */
-    private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
-
-    /** */
-    private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
-
-    /** */
-    private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
-
-    /** */
-    private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
-
-    /** */
-    private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
-
-    /** */
-    private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
-
-    /** */
-    private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
-
-    /** */
-    private static final byte[] BYTE_ARR_EMPTY = new byte[0];
-
-    /** */
-    private static final short[] SHORT_ARR_EMPTY = new short[0];
-
-    /** */
-    private static final int[] INT_ARR_EMPTY = U.EMPTY_INTS;
-
-    /** */
-    private static final long[] LONG_ARR_EMPTY = U.EMPTY_LONGS;
-
-    /** */
-    private static final float[] FLOAT_ARR_EMPTY = new float[0];
-
-    /** */
-    private static final double[] DOUBLE_ARR_EMPTY = new double[0];
-
-    /** */
-    private static final char[] CHAR_ARR_EMPTY = new char[0];
-
-    /** */
-    private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0];
-
-    /** */
-    private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() {
-        @Override public byte[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return BYTE_ARR_EMPTY;
-
-                default:
-                    return new byte[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() {
-        @Override public short[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return SHORT_ARR_EMPTY;
-
-                default:
-                    return new short[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() {
-        @Override public int[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return INT_ARR_EMPTY;
-
-                default:
-                    return new int[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() {
-        @Override public long[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return LONG_ARR_EMPTY;
-
-                default:
-                    return new long[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() {
-        @Override public float[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return FLOAT_ARR_EMPTY;
-
-                default:
-                    return new float[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() {
-        @Override public double[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return DOUBLE_ARR_EMPTY;
-
-                default:
-                    return new double[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() {
-        @Override public char[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return CHAR_ARR_EMPTY;
-
-                default:
-                    return new char[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() {
-        @Override public boolean[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return BOOLEAN_ARR_EMPTY;
-
-                default:
-                    return new boolean[len];
-            }
-        }
-    };
-
-    /** */
-    private static final Object NULL = new Object();
-
-    /** */
-    private final MessageFactory msgFactory;
-
-    /** */
-    private ByteBuffer buf;
-
-    /** */
-    private byte[] heapArr;
-
-    /** */
-    private long baseOff;
-
-    /** */
-    private int arrOff = -1;
-
-    /** */
-    private Object tmpArr;
-
-    /** */
-    private int tmpArrOff;
-
-    /** */
-    private int tmpArrBytes;
-
-    /** */
-    private boolean msgTypeDone;
-
-    /** */
-    private Message msg;
-
-    /** */
-    private Iterator<?> mapIt;
-
-    /** */
-    private Iterator<?> it;
-
-    /** */
-    private int arrPos = -1;
-
-    /** */
-    private Object arrCur = NULL;
-
-    /** */
-    private Object mapCur = NULL;
-
-    /** */
-    private Object cur = NULL;
-
-    /** */
-    private boolean keyDone;
-
-    /** */
-    private int readSize = -1;
-
-    /** */
-    private int readItems;
-
-    /** */
-    private Object[] objArr;
-
-    /** */
-    private Collection<Object> col;
-
-    /** */
-    private Map<Object, Object> map;
-
-    /** */
-    private long prim;
-
-    /** */
-    private int primShift;
-
-    /** */
-    private int uuidState;
-
-    /** */
-    private long uuidMost;
-
-    /** */
-    private long uuidLeast;
-
-    /** */
-    private long uuidLocId;
-
-    /** */
-    private boolean lastFinished;
-
-    /**
-     * @param msgFactory Message factory.
-     */
-    public DirectByteBufferStream(MessageFactory msgFactory) {
-        this.msgFactory = msgFactory;
-    }
-
-    /**
-     * @param buf Buffer.
-     */
-    public void setBuffer(ByteBuffer buf) {
-        assert buf != null;
-
-        if (this.buf != buf) {
-            this.buf = buf;
-
-            heapArr = buf.isDirect() ? null : buf.array();
-            baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF;
-        }
-    }
-
-    /**
-     * @return Number of remaining bytes.
-     */
-    public int remaining() {
-        return buf.remaining();
-    }
-
-    /**
-     * @return Whether last object was fully written or read.
-     */
-    public boolean lastFinished() {
-        return lastFinished;
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeByte(byte val) {
-        lastFinished = buf.remaining() >= 1;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putByte(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 1);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeShort(short val) {
-        lastFinished = buf.remaining() >= 2;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putShort(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 2);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeInt(int val) {
-        lastFinished = buf.remaining() >= 5;
-
-        lastFinished = buf.remaining() >= 5;
-
-        if (lastFinished) {
-            if (val == Integer.MAX_VALUE)
-                val = Integer.MIN_VALUE;
-            else
-                val++;
-
-            int pos = buf.position();
-
-            while ((val & 0xFFFF_FF80) != 0) {
-                byte b = (byte)(val | 0x80);
-
-                UNSAFE.putByte(heapArr, baseOff + pos++, b);
-
-                val >>>= 7;
-            }
-
-            UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val);
-
-            buf.position(pos);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeLong(long val) {
-        lastFinished = buf.remaining() >= 10;
-
-        if (lastFinished) {
-            if (val == Long.MAX_VALUE)
-                val = Long.MIN_VALUE;
-            else
-                val++;
-
-            int pos = buf.position();
-
-            while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
-                byte b = (byte)(val | 0x80);
-
-                UNSAFE.putByte(heapArr, baseOff + pos++, b);
-
-                val >>>= 7;
-            }
-
-            UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val);
-
-            buf.position(pos);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeFloat(float val) {
-        lastFinished = buf.remaining() >= 4;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putFloat(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 4);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeDouble(double val) {
-        lastFinished = buf.remaining() >= 8;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putDouble(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 8);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeChar(char val) {
-        lastFinished = buf.remaining() >= 2;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putChar(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 2);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeBoolean(boolean val) {
-        lastFinished = buf.remaining() >= 1;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putBoolean(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 1);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeByteArray(byte[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value.
-     * @param off Offset.
-     * @param len Length.
-     */
-    public void writeByteArray(byte[] val, long off, int len) {
-        if (val != null)
-            lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeShortArray(short[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeIntArray(int[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeLongArray(long[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeFloatArray(float[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeDoubleArray(double[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeCharArray(char[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeBooleanArray(boolean[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, BOOLEAN_ARR_OFF, val.length, val.length);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeString(String val) {
-        writeByteArray(val != null ? val.getBytes() : null);
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeBitSet(BitSet val) {
-        writeLongArray(val != null ? val.toLongArray() : null);
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeUuid(UUID val) {
-        switch (uuidState) {
-            case 0:
-                writeBoolean(val == null);
-
-                if (!lastFinished || val == null)
-                    return;
-
-                uuidState++;
-
-            case 1:
-                writeLong(val.getMostSignificantBits());
-
-                if (!lastFinished)
-                    return;
-
-                uuidState++;
-
-            case 2:
-                writeLong(val.getLeastSignificantBits());
-
-                if (!lastFinished)
-                    return;
-
-                uuidState = 0;
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeIgniteUuid(IgniteUuid val) {
-        switch (uuidState) {
-            case 0:
-                writeBoolean(val == null);
-
-                if (!lastFinished || val == null)
-                    return;
-
-                uuidState++;
-
-            case 1:
-                writeLong(val.globalId().getMostSignificantBits());
-
-                if (!lastFinished)
-                    return;
-
-                uuidState++;
-
-            case 2:
-                writeLong(val.globalId().getLeastSignificantBits());
-
-                if (!lastFinished)
-                    return;
-
-                uuidState++;
-
-            case 3:
-                writeLong(val.localId());
-
-                if (!lastFinished)
-                    return;
-
-                uuidState = 0;
-        }
-    }
-
-    /**
-     * @param msg Message.
-     */
-    public void writeMessage(Message msg, MessageWriter writer) {
-        if (msg != null) {
-            if (buf.hasRemaining()) {
-                try {
-                    writer.beforeInnerMessageWrite();
-
-                    lastFinished = msg.writeTo(buf, writer);
-                }
-                finally {
-                    writer.afterInnerMessageWrite(lastFinished);
-                }
-            }
-            else
-                lastFinished = false;
-        }
-        else
-            writeByte(Byte.MIN_VALUE);
-    }
-
-    /**
-     * @param arr Array.
-     * @param itemType Component type.
-     * @param writer Writer.
-     */
-    public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer) {
-        if (arr != null) {
-            int len = arr.length;
-
-            if (arrPos == -1) {
-                writeInt(len);
-
-                if (!lastFinished)
-                    return;
-
-                arrPos = 0;
-            }
-
-            while (arrPos < len || arrCur != NULL) {
-                if (arrCur == NULL)
-                    arrCur = arr[arrPos++];
-
-                write(itemType, arrCur, writer);
-
-                if (!lastFinished)
-                    return;
-
-                arrCur = NULL;
-            }
-
-            arrPos = -1;
-        }
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param col Collection.
-     * @param itemType Item type.
-     * @param writer Writer.
-     */
-    public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, MessageWriter writer) {
-        if (col != null) {
-            if (it == null) {
-                writeInt(col.size());
-
-                if (!lastFinished)
-                    return;
-
-                it = col.iterator();
-            }
-
-            while (it.hasNext() || cur != NULL) {
-                if (cur == NULL)
-                    cur = it.next();
-
-                write(itemType, cur, writer);
-
-                if (!lastFinished)
-                    return;
-
-                cur = NULL;
-            }
-
-            it = null;
-        }
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param list List.
-     * @param itemType Component type.
-     * @param writer Writer.
-     */
-    public <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType itemType, MessageWriter writer) {
-        if (list != null) {
-            assert list instanceof RandomAccess;
-
-            int size = list.size();
-
-            if (arrPos == -1) {
-                writeInt(size);
-
-                if (!lastFinished)
-                    return;
-
-                arrPos = 0;
-            }
-
-            while (arrPos < size || arrCur != NULL) {
-                if (arrCur == NULL)
-                    arrCur = list.get(arrPos++);
-
-                write(itemType, arrCur, writer);
-
-                if (!lastFinished)
-                    return;
-
-                arrCur = NULL;
-            }
-
-            arrPos = -1;
-        }
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param map Map.
-     * @param keyType Key type.
-     * @param valType Value type.
-     * @param writer Writer.
-     */
-    @SuppressWarnings("unchecked")
-    public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, MessageCollectionItemType valType,
-        MessageWriter writer) {
-        if (map != null) {
-            if (mapIt == null) {
-                writeInt(map.size());
-
-                if (!lastFinished)
-                    return;
-
-                mapIt = map.entrySet().iterator();
-            }
-
-            while (mapIt.hasNext() || mapCur != NULL) {
-                Map.Entry<K, V> e;
-
-                if (mapCur == NULL)
-                    mapCur = mapIt.next();
-
-                e = (Map.Entry<K, V>)mapCur;
-
-                if (!keyDone) {
-                    write(keyType, e.getKey(), writer);
-
-                    if (!lastFinished)
-                        return;
-
-                    keyDone = true;
-                }
-
-                write(valType, e.getValue(), writer);
-
-                if (!lastFinished)
-                    return;
-
-                mapCur = NULL;
-                keyDone = false;
-            }
-
-            mapIt = null;
-        }
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @return Value.
-     */
-    public byte readByte() {
-        lastFinished = buf.remaining() >= 1;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 1);
-
-            return UNSAFE.getByte(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public short readShort() {
-        lastFinished = buf.remaining() >= 2;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 2);
-
-            return UNSAFE.getShort(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public int readInt() {
-        lastFinished = false;
-
-        int val = 0;
-
-        while (buf.hasRemaining()) {
-            int pos = buf.position();
-
-            byte b = UNSAFE.getByte(heapArr, baseOff + pos);
-
-            buf.position(pos + 1);
-
-            prim |= ((long)b & 0x7F) << (7 * primShift);
-
-            if ((b & 0x80) == 0) {
-                lastFinished = true;
-
-                val = (int)prim;
-
-                if (val == Integer.MIN_VALUE)
-                    val = Integer.MAX_VALUE;
-                else
-                    val--;
-
-                prim = 0;
-                primShift = 0;
-
-                break;
-            }
-            else
-                primShift++;
-        }
-
-        return val;
-    }
-
-    /**
-     * @return Value.
-     */
-    public long readLong() {
-        lastFinished = false;
-
-        long val = 0;
-
-        while (buf.hasRemaining()) {
-            int pos = buf.position();
-
-            byte b = UNSAFE.getByte(heapArr, baseOff + pos);
-
-            buf.position(pos + 1);
-
-            prim |= ((long)b & 0x7F) << (7 * primShift);
-
-            if ((b & 0x80) == 0) {
-                lastFinished = true;
-
-                val = prim;
-
-                if (val == Long.MIN_VALUE)
-                    val = Long.MAX_VALUE;
-                else
-                    val--;
-
-                prim = 0;
-                primShift = 0;
-
-                break;
-            }
-            else
-                primShift++;
-        }
-
-        return val;
-    }
-
-    /**
-     * @return Value.
-     */
-    public float readFloat() {
-        lastFinished = buf.remaining() >= 4;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 4);
-
-            return UNSAFE.getFloat(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public double readDouble() {
-        lastFinished = buf.remaining() >= 8;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 8);
-
-            return UNSAFE.getDouble(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public char readChar() {
-        lastFinished = buf.remaining() >= 2;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 2);
-
-            return UNSAFE.getChar(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public boolean readBoolean() {
-        lastFinished = buf.hasRemaining();
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 1);
-
-            return UNSAFE.getBoolean(heapArr, baseOff + pos);
-        }
-        else
-            return false;
-    }
-
-    /**
-     * @return Value.
-     */
-    public byte[] readByteArray() {
-        return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF);
-    }
-
-    /**
-     /**
-      * @return Value.
-      */
-    public short[] readShortArray() {
-        return readArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public int[] readIntArray() {
-        return readArray(INT_ARR_CREATOR, 2, INT_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public long[] readLongArray() {
-        return readArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public float[] readFloatArray() {
-        return readArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public double[] readDoubleArray() {
-        return readArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public char[] readCharArray() {
-        return readArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public boolean[] readBooleanArray() {
-        return readArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public String readString() {
-        byte[] arr = readByteArray();
-
-        return arr != null ? new String(arr) : null;
-    }
-
-    /**
-     * @return Value.
-     */
-    public BitSet readBitSet() {
-        long[] arr = readLongArray();
-
-        return arr != null ? BitSet.valueOf(arr) : null;
-    }
-
-    /**
-     * @return Value.
-     */
-    public UUID readUuid() {
-        switch (uuidState) {
-            case 0:
-                boolean isNull = readBoolean();
-
-                if (!lastFinished || isNull)
-                    return null;
-
-                uuidState++;
-
-            case 1:
-                uuidMost = readLong();
-
-                if (!lastFinished)
-                    return null;
-
-                uuidState++;
-
-            case 2:
-                uuidLeast = readLong();
-
-                if (!lastFinished)
-                    return null;
-
-                uuidState = 0;
-        }
-
-        UUID val = new UUID(uuidMost, uuidLeast);
-
-        uuidMost = 0;
-        uuidLeast = 0;
-
-        return val;
-    }
-
-    /**
-     * @return Value.
-     */
-    public IgniteUuid readIgniteUuid() {
-        switch (uuidState) {
-            case 0:
-                boolean isNull = readBoolean();
-
-                if (!lastFinished || isNull)
-                    return null;
-
-                uuidState++;
-
-            case 1:
-                uuidMost = readLong();
-
-                if (!lastFinished)
-                    return null;
-
-                uuidState++;
-
-            case 2:
-                uuidLeast = readLong();
-
-                if (!lastFinished)
-                    return null;
-
-                uuidState++;
-
-            case 3:
-                uuidLocId = readLong();
-
-                if (!lastFinished)
-                    return null;
-
-                uuidState = 0;
-        }
-
-        IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), uuidLocId);
-
-        uuidMost = 0;
-        uuidLeast = 0;
-
-        return val;
-    }
-
-    /**
-     * @return Message.
-     */
-    @SuppressWarnings("unchecked")
-    public <T extends Message> T readMessage(MessageReader reader) {
-        if (!msgTypeDone) {
-            if (!buf.hasRemaining()) {
-                lastFinished = false;
-
-                return null;
-            }
-
-            byte type = readByte();
-
-            msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type);
-
-            msgTypeDone = true;
-        }
-
-        if (msg != null) {
-            try {
-                reader.beforeInnerMessageRead();
-
-                lastFinished = msg.readFrom(buf, reader);
-            }
-            finally {
-                reader.afterInnerMessageRead(lastFinished);
-            }
-        }
-        else
-            lastFinished = true;
-
-        if (lastFinished) {
-            Message msg0 = msg;
-
-            msgTypeDone = false;
-            msg = null;
-
-            return (T)msg0;
-        }
-        else
-            return null;
-    }
-
-    /**
-     * @param itemType Component type.
-     * @param itemCls Component class.
-     * @param reader Reader.
-     * @return Array.
-     */
-    @SuppressWarnings("unchecked")
-    public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, MessageReader reader) {
-        if (readSize == -1) {
-            int size = readInt();
-
-            if (!lastFinished)
-                return null;
-
-            readSize = size;
-        }
-
-        if (readSize >= 0) {
-            if (objArr == null)
-                objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize];
-
-            for (int i = readItems; i < readSize; i++) {
-                Object item = read(itemType, reader);
-
-                if (!lastFinished)
-                    return null;
-
-                objArr[i] = item;
-
-                readItems++;
-            }
-        }
-
-        readSize = -1;
-        readItems = 0;
-        cur = null;
-
-        T[] objArr0 = (T[])objArr;
-
-        objArr = null;
-
-        return objArr0;
-    }
-
-    /**
-     * @param itemType Item type.
-     * @param reader Reader.
-     * @return Collection.
-     */
-    @SuppressWarnings("unchecked")
-    public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader) {
-        if (readSize == -1) {
-            int size = readInt();
-
-            if (!lastFinished)
-                return null;
-
-            readSize = size;
-        }
-
-        if (readSize >= 0) {
-            if (col == null)
-                col = new ArrayList<>(readSize);
-
-            for (int i = readItems; i < readSize; i++) {
-                Object item = read(itemType, reader);
-
-                if (!lastFinished)
-                    return null;
-
-                col.add(item);
-
-                readItems++;
-            }
-        }
-
-        readSize = -1;
-        readItems = 0;
-        cur = null;
-
-        C col0 = (C)col;
-
-        col = null;
-
-        return col0;
-    }
-
-    /**
-     * @param keyType Key type.
-     * @param valType Value type.
-     * @param linked Whether linked map should be created.
-     * @param reader Reader.
-     * @return Map.
-     */
-    @SuppressWarnings("unchecked")
-    public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType,
-        boolean linked, MessageReader reader) {
-        if (readSize == -1) {
-            int size = readInt();
-
-            if (!lastFinished)
-                return null;
-
-            readSize = size;
-        }
-
-        if (readSize >= 0) {
-            if (map == null)
-                map = linked ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize);
-
-            for (int i = readItems; i < readSize; i++) {
-                if (!keyDone) {
-                    Object key = read(keyType, reader);
-
-                    if (!lastFinished)
-                        return null;
-
-                    mapCur = key;
-                    keyDone = true;
-                }
-
-                Object val = read(valType, reader);
-
-                if (!lastFinished)
-                    return null;
-
-                map.put(mapCur, val);
-
-                keyDone = false;
-
-                readItems++;
-            }
-        }
-
-        readSize = -1;
-        readItems = 0;
-        mapCur = null;
-
-        M map0 = (M)map;
-
-        map = null;
-
-        return map0;
-    }
-
-    /**
-     * @param arr Array.
-     * @param off Offset.
-     * @param len Length.
-     * @param bytes Length in bytes.
-     * @return Whether array was fully written.
-     */
-    private boolean writeArray(Object arr, long off, int len, int bytes) {
-        assert arr != null;
-        assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
-        assert off > 0;
-        assert len >= 0;
-        assert bytes >= 0;
-        assert bytes >= arrOff;
-
-        if (arrOff == -1) {
-            writeInt(len);
-
-            if (!lastFinished)
-                return false;
-
-            arrOff = 0;
-        }
-
-        int toWrite = bytes - arrOff;
-        int pos = buf.position();
-        int remaining = buf.remaining();
-
-        if (toWrite <= remaining) {
-            if (toWrite > 0) {
-                UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
-
-                buf.position(pos + toWrite);
-            }
-
-            arrOff = -1;
-
-            return true;
-        }
-        else {
-            if (remaining > 0) {
-                UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
-
-                buf.position(pos + remaining);
-
-                arrOff += remaining;
-            }
-
-            return false;
-        }
-    }
-
-    /**
-     * @param creator Array creator.
-     * @param lenShift Array length shift size.
-     * @param off Base offset.
-     * @return Array or special value if it was not fully read.
-     */
-    @SuppressWarnings("unchecked")
-    private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) {
-        assert creator != null;
-
-        if (tmpArr == null) {
-            int len = readInt();
-
-            if (!lastFinished)
-                return null;
-
-            switch (len) {
-                case -1:
-                    lastFinished = true;
-
-                    return null;
-
-                case 0:
-                    lastFinished = true;
-
-                    return creator.create(0);
-
-                default:
-                    tmpArr = creator.create(len);
-                    tmpArrBytes = len << lenShift;
-            }
-        }
-
-        int toRead = tmpArrBytes - tmpArrOff;
-        int remaining = buf.remaining();
-        int pos = buf.position();
-
-        lastFinished = toRead <= remaining;
-
-        if (lastFinished) {
-            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
-
-            buf.position(pos + toRead);
-
-            T arr = (T)tmpArr;
-
-            tmpArr = null;
-            tmpArrBytes = 0;
-            tmpArrOff = 0;
-
-            return arr;
-        }
-        else {
-            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
-
-            buf.position(pos + remaining);
-
-            tmpArrOff += remaining;
-
-            return null;
-        }
-    }
-
-    /**
-     * @param type Type.
-     * @param val Value.
-     * @param writer Writer.
-     */
-    private void write(MessageCollectionItemType type, Object val, MessageWriter writer) {
-        switch (type) {
-            case BYTE:
-                writeByte((Byte)val);
-
-                break;
-
-            case SHORT:
-                writeShort((Short)val);
-
-                break;
-
-            case INT:
-                writeInt((Integer)val);
-
-                break;
-
-            case LONG:
-                writeLong((Long)val);
-
-                break;
-
-            case FLOAT:
-                writeFloat((Float)val);
-
-                break;
-
-            case DOUBLE:
-                writeDouble((Double)val);
-
-                break;
-
-            case CHAR:
-                writeChar((Character)val);
-
-                break;
-
-            case BOOLEAN:
-                writeBoolean((Boolean)val);
-
-                break;
-
-            case BYTE_ARR:
-                writeByteArray((byte[])val);
-
-                break;
-
-            case SHORT_ARR:
-                writeShortArray((short[])val);
-
-                break;
-
-            case INT_ARR:
-                writeIntArray((int[])val);
-
-                break;
-
-            case LONG_ARR:
-                writeLongArray((long[])val);
-
-                break;
-
-            case FLOAT_ARR:
-                writeFloatArray((float[])val);
-
-                break;
-
-            case DOUBLE_ARR:
-                writeDoubleArray((double[])val);
-
-                break;
-
-            case CHAR_ARR:
-                writeCharArray((char[])val);
-
-                break;
-
-            case BOOLEAN_ARR:
-                writeBooleanArray((boolean[])val);
-
-                break;
-
-            case STRING:
-                writeString((String)val);
-
-                break;
-
-            case BIT_SET:
-                writeBitSet((BitSet)val);
-
-                break;
-
-            case UUID:
-                writeUuid((UUID)val);
-
-                break;
-
-            case IGNITE_UUID:
-                writeIgniteUuid((IgniteUuid)val);
-
-                break;
-
-            case MSG:
-                try {
-                    if (val != null)
-                        writer.beforeInnerMessageWrite();
-
-                    writeMessage((Message)val, writer);
-                }
-                finally {
-                    if (val != null)
-                        writer.afterInnerMessageWrite(lastFinished);
-                }
-
-                break;
-
-            default:
-                throw new IllegalArgumentException("Unknown type: " + type);
-        }
-    }
-
-    /**
-     * @param type Type.
-     * @param reader Reader.
-     * @return Value.
-     */
-    private Object read(MessageCollectionItemType type, MessageReader reader) {
-        switch (type) {
-            case BYTE:
-                return readByte();
-
-            case SHORT:
-                return readShort();
-
-            case INT:
-                return readInt();
-
-            case LONG:
-                return readLong();
-
-            case FLOAT:
-                return readFloat();
-
-            case DOUBLE:
-                return readDouble();
-
-            case CHAR:
-                return readChar();
-
-            case BOOLEAN:
-                return readBoolean();
-
-            case BYTE_ARR:
-                return readByteArray();
-
-            case SHORT_ARR:
-                return readShortArray();
-
-            case INT_ARR:
-                return readIntArray();
-
-            case LONG_ARR:
-                return readLongArray();
-
-            case FLOAT_ARR:
-                return readFloatArray();
-
-            case DOUBLE_ARR:
-                return readDoubleArray();
-
-            case CHAR_ARR:
-                return readCharArray();
-
-            case BOOLEAN_ARR:
-                return readBooleanArray();
-
-            case STRING:
-                return readString();
-
-            case BIT_SET:
-                return readBitSet();
-
-            case UUID:
-                return readUuid();
-
-            case IGNITE_UUID:
-                return readIgniteUuid();
-
-            case MSG:
-                return readMessage(reader);
-
-            default:
-                throw new IllegalArgumentException("Unknown type: " + type);
-        }
-    }
-
-    /**
-     * Array creator.
-     */
-    private static interface ArrayCreator<T> {
-        /**
-         * @param len Array length or {@code -1} if array was not fully read.
-         * @return New array.
-         */
-        public T create(int len);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index 2f91fbd..297d3e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -22,6 +22,7 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -41,9 +42,10 @@ public class DirectMessageReader implements MessageReader {
 
     /**
      * @param msgFactory Message factory.
+     * @param protoVer Protocol version.
      */
-    public DirectMessageReader(MessageFactory msgFactory) {
-        state = new DirectMessageReaderState(msgFactory);
+    public DirectMessageReader(MessageFactory msgFactory, byte protoVer) {
+        state = new DirectMessageReaderState(msgFactory, protoVer);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
index d423052..1b02213 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.direct;
 
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
+import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 
 /**
@@ -29,6 +32,9 @@ public class DirectMessageReaderState {
     /** Message factory. */
     private final MessageFactory msgFactory;
 
+    /** Protocol version. */
+    private final byte protoVer;
+
     /** Stack array. */
     private StateItem[] stack;
 
@@ -37,13 +43,15 @@ public class DirectMessageReaderState {
 
     /**
      * @param msgFactory Message factory.
+     * @param protoVer Protocol version.
      */
-    public DirectMessageReaderState(MessageFactory msgFactory) {
+    public DirectMessageReaderState(MessageFactory msgFactory, byte protoVer) {
         this.msgFactory = msgFactory;
+        this.protoVer = protoVer;
 
         stack = new StateItem[INIT_SIZE];
 
-        stack[0] = new StateItem(msgFactory);
+        stack[0] = new StateItem(msgFactory, protoVer);
     }
 
     /**
@@ -84,7 +92,7 @@ public class DirectMessageReaderState {
         }
 
         if (stack[pos] == null)
-            stack[pos] = new StateItem(msgFactory);
+            stack[pos] = new StateItem(msgFactory, protoVer);
     }
 
     /**
@@ -120,9 +128,23 @@ public class DirectMessageReaderState {
 
         /**
          * @param msgFactory Message factory.
+         * @param protoVer Protocol version.
          */
-        public StateItem(MessageFactory msgFactory) {
-            stream = new DirectByteBufferStream(msgFactory);
+        public StateItem(MessageFactory msgFactory, byte protoVer) {
+            switch (protoVer) {
+                case 1:
+                    stream = new DirectByteBufferStreamImplV1(msgFactory);
+
+                    break;
+
+                case 2:
+                    stream = new DirectByteBufferStreamImplV2(msgFactory);
+
+                    break;
+
+                default:
+                    throw new IllegalStateException("Invalid protocol version: " + protoVer);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index 3f2866f..07a037e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -20,10 +20,11 @@ package org.apache.ignite.internal.direct;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
-import java.util.RandomAccess;
 import java.util.UUID;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
+import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -35,11 +36,31 @@ import org.jetbrains.annotations.Nullable;
  */
 public class DirectMessageWriter implements MessageWriter {
     /** Stream. */
-    private final DirectByteBufferStream stream = new DirectByteBufferStream(null);
+    private final DirectByteBufferStream stream;
 
     /** State. */
     private final DirectMessageWriterState state = new DirectMessageWriterState();
 
+    /**
+     * @param protoVer Protocol version.
+     */
+    public DirectMessageWriter(byte protoVer) {
+        switch (protoVer) {
+            case 1:
+                stream = new DirectByteBufferStreamImplV1(null);
+
+                break;
+
+            case 2:
+                stream = new DirectByteBufferStreamImplV2(null);
+
+                break;
+
+            default:
+                throw new IllegalStateException("Invalid protocol version: " + protoVer);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void setBuffer(ByteBuffer buf) {
         stream.setBuffer(buf);
@@ -215,9 +236,9 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType) {
-        if (col instanceof List && col instanceof RandomAccess)
-            stream.writeRandomAccessList((List<T>)col, itemType, this);
-        else
+//        if (col instanceof List && col instanceof RandomAccess)
+//            stream.writeRandomAccessList((List<T>)col, itemType, this);
+//        else
             stream.writeCollection(col, itemType, this);
 
         return stream.lastFinished();

http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
new file mode 100644
index 0000000..bc9de5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct.stream;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Direct marshalling I/O stream.
+ */
+public interface DirectByteBufferStream {
+    /**
+     * @param buf Buffer.
+     */
+    public void setBuffer(ByteBuffer buf);
+
+    /**
+     * @return Number of remaining bytes.
+     */
+    public int remaining();
+
+    /**
+     * @return Whether last object was fully written or read.
+     */
+    public boolean lastFinished();
+
+    /**
+     * @param val Value.
+     */
+    public void writeByte(byte val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeShort(short val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeInt(int val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeLong(long val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeFloat(float val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeDouble(double val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeChar(char val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeBoolean(boolean val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeByteArray(byte[] val);
+
+    /**
+     * @param val Value.
+     * @param off Offset.
+     * @param len Length.
+     */
+    public void writeByteArray(byte[] val, long off, int len);
+
+    /**
+     * @param val Value.
+     */
+    public void writeShortArray(short[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeIntArray(int[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeLongArray(long[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeFloatArray(float[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeDoubleArray(double[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeCharArray(char[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeBooleanArray(boolean[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeString(String val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeBitSet(BitSet val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeUuid(UUID val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeIgniteUuid(IgniteUuid val);
+
+    /**
+     * @param msg Message.
+     * @param writer Writer.
+     */
+    public void writeMessage(Message msg, MessageWriter writer);
+
+    /**
+     * @param arr Array.
+     * @param itemType Component type.
+     * @param writer Writer.
+     */
+    public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer);
+
+    /**
+     * @param col Collection.
+     * @param itemType Component type.
+     * @param writer Writer.
+     */
+    public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, MessageWriter writer);
+
+    /**
+     * @param map Map.
+     * @param keyType Key type.
+     * @param valType Value type.
+     * @param writer Writer.
+     */
+    public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, MessageCollectionItemType valType,
+        MessageWriter writer);
+
+    /**
+     * @return Value.
+     */
+    public byte readByte();
+
+    /**
+     * @return Value.
+     */
+    public short readShort();
+
+    /**
+     * @return Value.
+     */
+    public int readInt();
+
+    /**
+     * @return Value.
+     */
+    public long readLong();
+
+    /**
+     * @return Value.
+     */
+    public float readFloat();
+
+    /**
+     * @return Value.
+     */
+    public double readDouble();
+
+    /**
+     * @return Value.
+     */
+    public char readChar();
+
+    /**
+     * @return Value.
+     */
+    public boolean readBoolean();
+
+    /**
+     * @return Value.
+     */
+    public byte[] readByteArray();
+
+    /**
+     * @return Value.
+     */
+    public short[] readShortArray();
+
+    /**
+     * @return Value.
+     */
+    public int[] readIntArray();
+
+    /**
+     * @return Value.
+     */
+    public long[] readLongArray();
+
+    /**
+     * @return Value.
+     */
+    public float[] readFloatArray();
+
+    /**
+     * @return Value.
+     */
+    public double[] readDoubleArray();
+
+    /**
+     * @return Value.
+     */
+    public char[] readCharArray();
+
+    /**
+     * @return Value.
+     */
+    public boolean[] readBooleanArray();
+
+    /**
+     * @return Value.
+     */
+    public String readString();
+
+    /**
+     * @return Value.
+     */
+    public BitSet readBitSet();
+
+    /**
+     * @return Value.
+     */
+    public UUID readUuid();
+
+    /**
+     * @return Value.
+     */
+    public IgniteUuid readIgniteUuid();
+
+    /**
+     * @param reader Reader.
+     * @return Message.
+     */
+    public <T extends Message> T readMessage(MessageReader reader);
+
+    /**
+     * @param itemType Item type.
+     * @param itemCls Item class.
+     * @param reader Reader.
+     * @return Array.
+     */
+    public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, MessageReader reader);
+
+    /**
+     * @param itemType Item type.
+     * @param reader Reader.
+     * @return Collection.
+     */
+    public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader);
+
+    /**
+     * @param keyType Key type.
+     * @param valType Value type.
+     * @param linked Whether linked map should be created.
+     * @param reader Reader.
+     * @return Map.
+     */
+    public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType,
+        boolean linked, MessageReader reader);
+}