You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/01/20 14:19:27 UTC

[3/4] ignite git commit: ignite-2080 Data alignment issues with Unsafe

http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
index 1a4c4bb..444e367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
@@ -38,7 +38,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import sun.misc.Unsafe;
 import sun.nio.ch.DirectBuffer;
 
 /**
@@ -46,33 +45,6 @@ import sun.nio.ch.DirectBuffer;
  */
 public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     /** */
-    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
-    /** */
-    private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
-    /** */
-    private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
-
-    /** */
-    private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
-
-    /** */
-    private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
-
-    /** */
-    private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
-
-    /** */
-    private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
-
-    /** */
-    private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
-
-    /** */
-    private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
-
-    /** */
     private static final byte[] BYTE_ARR_EMPTY = new byte[0];
 
     /** */
@@ -322,7 +294,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
             this.buf = buf;
 
             heapArr = buf.isDirect() ? null : buf.array();
-            baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF;
+            baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : GridUnsafe.BYTE_ARR_OFF;
         }
     }
 
@@ -343,7 +315,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
         if (lastFinished) {
             int pos = buf.position();
 
-            UNSAFE.putByte(heapArr, baseOff + pos, val);
+            GridUnsafe.putByte(heapArr, baseOff + pos, val);
 
             buf.position(pos + 1);
         }
@@ -356,7 +328,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
         if (lastFinished) {
             int pos = buf.position();
 
-            UNSAFE.putShort(heapArr, baseOff + pos, val);
+            GridUnsafe.putShortAligned(heapArr, baseOff + pos, val);
 
             buf.position(pos + 2);
         }
@@ -377,12 +349,12 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
             while ((val & 0xFFFF_FF80) != 0) {
                 byte b = (byte)(val | 0x80);
 
-                UNSAFE.putByte(heapArr, baseOff + pos++, b);
+                GridUnsafe.putByte(heapArr, baseOff + pos++, b);
 
                 val >>>= 7;
             }
 
-            UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val);
+            GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
 
             buf.position(pos);
         }
@@ -403,12 +375,12 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
             while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
                 byte b = (byte)(val | 0x80);
 
-                UNSAFE.putByte(heapArr, baseOff + pos++, b);
+                GridUnsafe.putByte(heapArr, baseOff + pos++, b);
 
                 val >>>= 7;
             }
 
-            UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val);
+            GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
 
             buf.position(pos);
         }
@@ -421,7 +393,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
         if (lastFinished) {
             int pos = buf.position();
 
-            UNSAFE.putFloat(heapArr, baseOff + pos, val);
+            GridUnsafe.putFloatAligned(heapArr, baseOff + pos, val);
 
             buf.position(pos + 4);
         }
@@ -434,7 +406,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
         if (lastFinished) {
             int pos = buf.position();
 
-            UNSAFE.putDouble(heapArr, baseOff + pos, val);
+            GridUnsafe.putDoubleAligned(heapArr, baseOff + pos, val);
 
             buf.position(pos + 8);
         }
@@ -447,7 +419,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
         if (lastFinished) {
             int pos = buf.position();
 
-            UNSAFE.putChar(heapArr, baseOff + pos, val);
+            GridUnsafe.putCharAligned(heapArr, baseOff + pos, val);
 
             buf.position(pos + 2);
         }
@@ -460,7 +432,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
         if (lastFinished) {
             int pos = buf.position();
 
-            UNSAFE.putBoolean(heapArr, baseOff + pos, val);
+            GridUnsafe.putBoolean(heapArr, baseOff + pos, val);
 
             buf.position(pos + 1);
         }
@@ -469,7 +441,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override public void writeByteArray(byte[] val) {
         if (val != null)
-            lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length);
+            lastFinished = writeArray(val, GridUnsafe.BYTE_ARR_OFF, val.length, val.length);
         else
             writeInt(-1);
     }
@@ -477,7 +449,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override public void writeByteArray(byte[] val, long off, int len) {
         if (val != null)
-            lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len);
+            lastFinished = writeArray(val, GridUnsafe.BYTE_ARR_OFF + off, len, len);
         else
             writeInt(-1);
     }
@@ -485,7 +457,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override public void writeShortArray(short[] val) {
         if (val != null)
-            lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1);
+            lastFinished = writeArray(val, GridUnsafe.SHORT_ARR_OFF, val.length, val.length << 1);
         else
             writeInt(-1);
     }
@@ -493,7 +465,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override public void writeIntArray(int[] val) {
         if (val != null)
-            lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2);
+            lastFinished = writeArray(val, GridUnsafe.INT_ARR_OFF, val.length, val.length << 2);
         else
             writeInt(-1);
     }
@@ -501,7 +473,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override public void writeLongArray(long[] val) {
         if (val != null)
-            lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3);
+            lastFinished = writeArray(val, GridUnsafe.LONG_ARR_OFF, val.length, val.length << 3);
         else
             writeInt(-1);
     }
@@ -509,7 +481,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override public void writeFloatArray(float[] val) {
         if (val != null)
-            lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2);
+            lastFinished = writeArray(val, GridUnsafe.FLOAT_ARR_OFF, val.length, val.length << 2);
         else
             writeInt(-1);
     }
@@ -517,7 +489,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override public void writeDoubleArray(double[] val) {
         if (val != null)
-            lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3);
+            lastFinished = writeArray(val, GridUnsafe.DOUBLE_ARR_OFF, val.length, val.length << 3);
         else
             writeInt(-1);
     }
@@ -525,7 +497,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override public void writeCharArray(char[] val) {
         if (val != null)
-            lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1);
+            lastFinished = writeArray(val, GridUnsafe.CHAR_ARR_OFF, val.length, val.length << 1);
         else
             writeInt(-1);
     }
@@ -533,7 +505,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     /** {@inheritDoc} */
     @Override public void writeBooleanArray(boolean[] val) {
         if (val != null)
-            lastFinished = writeArray(val, BOOLEAN_ARR_OFF, val.length, val.length);
+            lastFinished = writeArray(val, GridUnsafe.BOOLEAN_ARR_OFF, val.length, val.length);
         else
             writeInt(-1);
     }
@@ -793,7 +765,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
 
             buf.position(pos + 1);
 
-            return UNSAFE.getByte(heapArr, baseOff + pos);
+            return GridUnsafe.getByte(heapArr, baseOff + pos);
         }
         else
             return 0;
@@ -808,7 +780,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
 
             buf.position(pos + 2);
 
-            return UNSAFE.getShort(heapArr, baseOff + pos);
+            return GridUnsafe.getShortAligned(heapArr, baseOff + pos);
         }
         else
             return 0;
@@ -823,7 +795,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
         while (buf.hasRemaining()) {
             int pos = buf.position();
 
-            byte b = UNSAFE.getByte(heapArr, baseOff + pos);
+            byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
 
             buf.position(pos + 1);
 
@@ -860,7 +832,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
         while (buf.hasRemaining()) {
             int pos = buf.position();
 
-            byte b = UNSAFE.getByte(heapArr, baseOff + pos);
+            byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
 
             buf.position(pos + 1);
 
@@ -897,7 +869,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
 
             buf.position(pos + 4);
 
-            return UNSAFE.getFloat(heapArr, baseOff + pos);
+            return GridUnsafe.getFloatAligned(heapArr, baseOff + pos);
         }
         else
             return 0;
@@ -912,7 +884,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
 
             buf.position(pos + 8);
 
-            return UNSAFE.getDouble(heapArr, baseOff + pos);
+            return GridUnsafe.getDoubleAligned(heapArr, baseOff + pos);
         }
         else
             return 0;
@@ -927,7 +899,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
 
             buf.position(pos + 2);
 
-            return UNSAFE.getChar(heapArr, baseOff + pos);
+            return GridUnsafe.getCharAligned(heapArr, baseOff + pos);
         }
         else
             return 0;
@@ -942,7 +914,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
 
             buf.position(pos + 1);
 
-            return UNSAFE.getBoolean(heapArr, baseOff + pos);
+            return GridUnsafe.getBoolean(heapArr, baseOff + pos);
         }
         else
             return false;
@@ -950,42 +922,42 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
 
     /** {@inheritDoc} */
     @Override public byte[] readByteArray() {
-        return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF);
+        return readArray(BYTE_ARR_CREATOR, 0, GridUnsafe.BYTE_ARR_OFF);
     }
 
     /** {@inheritDoc} */
     @Override public short[] readShortArray() {
-        return readArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF);
+        return readArray(SHORT_ARR_CREATOR, 1, GridUnsafe.SHORT_ARR_OFF);
     }
 
     /** {@inheritDoc} */
     @Override public int[] readIntArray() {
-        return readArray(INT_ARR_CREATOR, 2, INT_ARR_OFF);
+        return readArray(INT_ARR_CREATOR, 2, GridUnsafe.INT_ARR_OFF);
     }
 
     /** {@inheritDoc} */
     @Override public long[] readLongArray() {
-        return readArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF);
+        return readArray(LONG_ARR_CREATOR, 3, GridUnsafe.LONG_ARR_OFF);
     }
 
     /** {@inheritDoc} */
     @Override public float[] readFloatArray() {
-        return readArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF);
+        return readArray(FLOAT_ARR_CREATOR, 2, GridUnsafe.FLOAT_ARR_OFF);
     }
 
     /** {@inheritDoc} */
     @Override public double[] readDoubleArray() {
-        return readArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF);
+        return readArray(DOUBLE_ARR_CREATOR, 3, GridUnsafe.DOUBLE_ARR_OFF);
     }
 
     /** {@inheritDoc} */
     @Override public char[] readCharArray() {
-        return readArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF);
+        return readArray(CHAR_ARR_CREATOR, 1, GridUnsafe.CHAR_ARR_OFF);
     }
 
     /** {@inheritDoc} */
     @Override public boolean[] readBooleanArray() {
-        return readArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF);
+        return readArray(BOOLEAN_ARR_CREATOR, 0, GridUnsafe.BOOLEAN_ARR_OFF);
     }
 
     /** {@inheritDoc} */
@@ -1289,7 +1261,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
 
         if (toWrite <= remaining) {
             if (toWrite > 0) {
-                UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
+                GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
 
                 buf.position(pos + toWrite);
             }
@@ -1300,7 +1272,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
         }
         else {
             if (remaining > 0) {
-                UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
+                GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
 
                 buf.position(pos + remaining);
 
@@ -1351,7 +1323,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
         lastFinished = toRead <= remaining;
 
         if (lastFinished) {
-            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
+            GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
 
             buf.position(pos + toRead);
 
@@ -1364,7 +1336,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
             return arr;
         }
         else {
-            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
+            GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
 
             buf.position(pos + remaining);
 
@@ -1583,7 +1555,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
     /**
      * Array creator.
      */
-    private static interface ArrayCreator<T> {
+    private interface ArrayCreator<T> {
         /**
          * @param len Array length or {@code -1} if array was not fully read.
          * @return New array.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v3/DirectByteBufferStreamImplV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v3/DirectByteBufferStreamImplV3.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v3/DirectByteBufferStreamImplV3.java
new file mode 100644
index 0000000..5e68408
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v3/DirectByteBufferStreamImplV3.java
@@ -0,0 +1,1595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct.stream.v3;
+
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.UUID;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import sun.nio.ch.DirectBuffer;
+
+/**
+ * Direct marshalling I/O stream (version 3).
+ */
+public class DirectByteBufferStreamImplV3 implements DirectByteBufferStream {
+    /** Whether little endian is set. */
+    private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
+
+    /** */
+    private static final byte[] BYTE_ARR_EMPTY = new byte[0];
+
+    /** */
+    private static final short[] SHORT_ARR_EMPTY = new short[0];
+
+    /** */
+    private static final int[] INT_ARR_EMPTY = U.EMPTY_INTS;
+
+    /** */
+    private static final long[] LONG_ARR_EMPTY = U.EMPTY_LONGS;
+
+    /** */
+    private static final float[] FLOAT_ARR_EMPTY = new float[0];
+
+    /** */
+    private static final double[] DOUBLE_ARR_EMPTY = new double[0];
+
+    /** */
+    private static final char[] CHAR_ARR_EMPTY = new char[0];
+
+    /** */
+    private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0];
+
+    /** */
+    private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() {
+        @Override public byte[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return BYTE_ARR_EMPTY;
+
+                default:
+                    return new byte[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() {
+        @Override public short[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return SHORT_ARR_EMPTY;
+
+                default:
+                    return new short[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() {
+        @Override public int[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return INT_ARR_EMPTY;
+
+                default:
+                    return new int[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() {
+        @Override public long[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return LONG_ARR_EMPTY;
+
+                default:
+                    return new long[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() {
+        @Override public float[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return FLOAT_ARR_EMPTY;
+
+                default:
+                    return new float[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() {
+        @Override public double[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return DOUBLE_ARR_EMPTY;
+
+                default:
+                    return new double[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() {
+        @Override public char[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return CHAR_ARR_EMPTY;
+
+                default:
+                    return new char[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() {
+        @Override public boolean[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return BOOLEAN_ARR_EMPTY;
+
+                default:
+                    return new boolean[len];
+            }
+        }
+    };
+
+    /** */
+    private static final Object NULL = new Object();
+
+    /** */
+    private final MessageFactory msgFactory;
+
+    /** */
+    private ByteBuffer buf;
+
+    /** */
+    private byte[] heapArr;
+
+    /** */
+    private long baseOff;
+
+    /** */
+    private int arrOff = -1;
+
+    /** */
+    private Object tmpArr;
+
+    /** */
+    private int tmpArrOff;
+
+    /** */
+    private int tmpArrBytes;
+
+    /** */
+    private boolean msgTypeDone;
+
+    /** */
+    private Message msg;
+
+    /** */
+    private Iterator<?> mapIt;
+
+    /** */
+    private Iterator<?> it;
+
+    /** */
+    private int arrPos = -1;
+
+    /** */
+    private Object arrCur = NULL;
+
+    /** */
+    private Object mapCur = NULL;
+
+    /** */
+    private Object cur = NULL;
+
+    /** */
+    private boolean keyDone;
+
+    /** */
+    private int readSize = -1;
+
+    /** */
+    private int readItems;
+
+    /** */
+    private Object[] objArr;
+
+    /** */
+    private Collection<Object> col;
+
+    /** */
+    private Map<Object, Object> map;
+
+    /** */
+    private long prim;
+
+    /** */
+    private int primShift;
+
+    /** */
+    private int uuidState;
+
+    /** */
+    private long uuidMost;
+
+    /** */
+    private long uuidLeast;
+
+    /** */
+    private long uuidLocId;
+
+    /** */
+    private boolean lastFinished;
+
+    /**
+     * @param msgFactory Message factory.
+     */
+    public DirectByteBufferStreamImplV3(MessageFactory msgFactory) {
+        this.msgFactory = msgFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setBuffer(ByteBuffer buf) {
+        assert buf != null;
+
+        if (this.buf != buf) {
+            this.buf = buf;
+
+            heapArr = buf.isDirect() ? null : buf.array();
+            baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : GridUnsafe.BYTE_ARR_OFF;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int remaining() {
+        return buf.remaining();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean lastFinished() {
+        return lastFinished;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByte(byte val) {
+        lastFinished = buf.remaining() >= 1;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            GridUnsafe.putByte(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 1);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShort(short val) {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            if (BIG_ENDIAN)
+                val = Short.reverseBytes(val);
+
+            GridUnsafe.putShortAligned(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 2);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeInt(int val) {
+        lastFinished = buf.remaining() >= 5;
+
+        if (lastFinished) {
+            if (val == Integer.MAX_VALUE)
+                val = Integer.MIN_VALUE;
+            else
+                val++;
+
+            int pos = buf.position();
+
+            while ((val & 0xFFFF_FF80) != 0) {
+                byte b = (byte)(val | 0x80);
+
+                GridUnsafe.putByte(heapArr, baseOff + pos++, b);
+
+                val >>>= 7;
+            }
+
+            GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
+
+            buf.position(pos);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLong(long val) {
+        lastFinished = buf.remaining() >= 10;
+
+        if (lastFinished) {
+            if (val == Long.MAX_VALUE)
+                val = Long.MIN_VALUE;
+            else
+                val++;
+
+            int pos = buf.position();
+
+            while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
+                byte b = (byte)(val | 0x80);
+
+                GridUnsafe.putByte(heapArr, baseOff + pos++, b);
+
+                val >>>= 7;
+            }
+
+            GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
+
+            buf.position(pos);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloat(float val) {
+        lastFinished = buf.remaining() >= 4;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            int v = Float.floatToIntBits(val);
+
+            if (BIG_ENDIAN)
+                v = Integer.reverseBytes(v);
+
+            GridUnsafe.putIntAligned(heapArr, baseOff + pos, v);
+
+            buf.position(pos + 4);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDouble(double val) {
+        lastFinished = buf.remaining() >= 8;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            long v = Double.doubleToLongBits(val);
+
+            if (BIG_ENDIAN)
+                v = Long.reverseBytes(v);
+
+            GridUnsafe.putLongAligned(heapArr, baseOff + pos, v);
+
+            buf.position(pos + 8);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeChar(char val) {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            if (BIG_ENDIAN)
+                val = Character.reverseBytes(val);
+
+            GridUnsafe.putCharAligned(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 2);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBoolean(boolean val) {
+        lastFinished = buf.remaining() >= 1;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            GridUnsafe.putBoolean(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 1);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByteArray(byte[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, GridUnsafe.BYTE_ARR_OFF, val.length, val.length);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByteArray(byte[] val, long off, int len) {
+        if (val != null)
+            lastFinished = writeArray(val, GridUnsafe.BYTE_ARR_OFF + off, len, len);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShortArray(short[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, GridUnsafe.SHORT_ARR_OFF, val.length, val.length << 1);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeIntArray(int[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, GridUnsafe.INT_ARR_OFF, val.length, val.length << 2);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLongArray(long[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, GridUnsafe.LONG_ARR_OFF, val.length, val.length << 3);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloatArray(float[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, GridUnsafe.FLOAT_ARR_OFF, val.length, val.length << 2);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDoubleArray(double[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, GridUnsafe.DOUBLE_ARR_OFF, val.length, val.length << 3);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeCharArray(char[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, GridUnsafe.CHAR_ARR_OFF, val.length, val.length << 1);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBooleanArray(boolean[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, GridUnsafe.BOOLEAN_ARR_OFF, val.length, val.length);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeString(String val) {
+        writeByteArray(val != null ? val.getBytes() : null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBitSet(BitSet val) {
+        writeLongArray(val != null ? val.toLongArray() : null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeUuid(UUID val) {
+        switch (uuidState) {
+            case 0:
+                writeBoolean(val == null);
+
+                if (!lastFinished || val == null)
+                    return;
+
+                uuidState++;
+
+            case 1:
+                writeLong(val.getMostSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState++;
+
+            case 2:
+                writeLong(val.getLeastSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState = 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeIgniteUuid(IgniteUuid val) {
+        switch (uuidState) {
+            case 0:
+                writeBoolean(val == null);
+
+                if (!lastFinished || val == null)
+                    return;
+
+                uuidState++;
+
+            case 1:
+                writeLong(val.globalId().getMostSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState++;
+
+            case 2:
+                writeLong(val.globalId().getLeastSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState++;
+
+            case 3:
+                writeLong(val.localId());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState = 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeMessage(Message msg, MessageWriter writer) {
+        if (msg != null) {
+            if (buf.hasRemaining()) {
+                try {
+                    writer.beforeInnerMessageWrite();
+
+                    lastFinished = msg.writeTo(buf, writer);
+                }
+                finally {
+                    writer.afterInnerMessageWrite(lastFinished);
+                }
+            }
+            else
+                lastFinished = false;
+        }
+        else
+            writeByte(Byte.MIN_VALUE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType,
+        MessageWriter writer) {
+        if (arr != null) {
+            int len = arr.length;
+
+            if (arrPos == -1) {
+                writeInt(len);
+
+                if (!lastFinished)
+                    return;
+
+                arrPos = 0;
+            }
+
+            while (arrPos < len || arrCur != NULL) {
+                if (arrCur == NULL)
+                    arrCur = arr[arrPos++];
+
+                write(itemType, arrCur, writer);
+
+                if (!lastFinished)
+                    return;
+
+                arrCur = NULL;
+            }
+
+            arrPos = -1;
+        }
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType,
+        MessageWriter writer) {
+        if (col != null) {
+            if (col instanceof List && col instanceof RandomAccess)
+                writeRandomAccessList((List<T>)col, itemType, writer);
+            else {
+                if (it == null) {
+                    writeInt(col.size());
+
+                    if (!lastFinished)
+                        return;
+
+                    it = col.iterator();
+                }
+
+                while (it.hasNext() || cur != NULL) {
+                    if (cur == NULL)
+                        cur = it.next();
+
+                    write(itemType, cur, writer);
+
+                    if (!lastFinished)
+                        return;
+
+                    cur = NULL;
+                }
+
+                it = null;
+            }
+        }
+        else
+            writeInt(-1);
+    }
+
+    /**
+     * @param list List.
+     * @param itemType Component type.
+     * @param writer Writer.
+     */
+    private <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType itemType, MessageWriter writer) {
+        assert list instanceof RandomAccess;
+
+        int size = list.size();
+
+        if (arrPos == -1) {
+            writeInt(size);
+
+            if (!lastFinished)
+                return;
+
+            arrPos = 0;
+        }
+
+        while (arrPos < size || arrCur != NULL) {
+            if (arrCur == NULL)
+                arrCur = list.get(arrPos++);
+
+            write(itemType, arrCur, writer);
+
+            if (!lastFinished)
+                return;
+
+            arrCur = NULL;
+        }
+
+        arrPos = -1;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType,
+        MessageCollectionItemType valType, MessageWriter writer) {
+        if (map != null) {
+            if (mapIt == null) {
+                writeInt(map.size());
+
+                if (!lastFinished)
+                    return;
+
+                mapIt = map.entrySet().iterator();
+            }
+
+            while (mapIt.hasNext() || mapCur != NULL) {
+                Map.Entry<K, V> e;
+
+                if (mapCur == NULL)
+                    mapCur = mapIt.next();
+
+                e = (Map.Entry<K, V>)mapCur;
+
+                if (!keyDone) {
+                    write(keyType, e.getKey(), writer);
+
+                    if (!lastFinished)
+                        return;
+
+                    keyDone = true;
+                }
+
+                write(valType, e.getValue(), writer);
+
+                if (!lastFinished)
+                    return;
+
+                mapCur = NULL;
+                keyDone = false;
+            }
+
+            mapIt = null;
+        }
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte readByte() {
+        lastFinished = buf.remaining() >= 1;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 1);
+
+            return GridUnsafe.getByte(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short readShort() {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 2);
+
+            short val = GridUnsafe.getShortAligned(heapArr, baseOff + pos);
+
+            if (BIG_ENDIAN)
+                val = Short.reverseBytes(val);
+
+            return val;
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readInt() {
+        lastFinished = false;
+
+        int val = 0;
+
+        while (buf.hasRemaining()) {
+            int pos = buf.position();
+
+            byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
+
+            buf.position(pos + 1);
+
+            prim |= ((long)b & 0x7F) << (7 * primShift);
+
+            if ((b & 0x80) == 0) {
+                lastFinished = true;
+
+                val = (int)prim;
+
+                if (val == Integer.MIN_VALUE)
+                    val = Integer.MAX_VALUE;
+                else
+                    val--;
+
+                prim = 0;
+                primShift = 0;
+
+                break;
+            }
+            else
+                primShift++;
+        }
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long readLong() {
+        lastFinished = false;
+
+        long val = 0;
+
+        while (buf.hasRemaining()) {
+            int pos = buf.position();
+
+            byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
+
+            buf.position(pos + 1);
+
+            prim |= ((long)b & 0x7F) << (7 * primShift);
+
+            if ((b & 0x80) == 0) {
+                lastFinished = true;
+
+                val = prim;
+
+                if (val == Long.MIN_VALUE)
+                    val = Long.MAX_VALUE;
+                else
+                    val--;
+
+                prim = 0;
+                primShift = 0;
+
+                break;
+            }
+            else
+                primShift++;
+        }
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float readFloat() {
+        lastFinished = buf.remaining() >= 4;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 4);
+
+            int val = GridUnsafe.getIntAligned(heapArr, baseOff + pos);
+
+            if (BIG_ENDIAN)
+                val = Integer.reverseBytes(val);
+
+            return Float.intBitsToFloat(val);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double readDouble() {
+        lastFinished = buf.remaining() >= 8;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 8);
+
+            long val = GridUnsafe.getLongAligned(heapArr, baseOff + pos);
+
+            if (BIG_ENDIAN)
+                val = Long.reverseBytes(val);
+
+            return Double.longBitsToDouble(val);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public char readChar() {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 2);
+
+            char val = GridUnsafe.getCharAligned(heapArr, baseOff + pos);
+
+            if (BIG_ENDIAN)
+                val = Character.reverseBytes(val);
+
+            return val;
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readBoolean() {
+        lastFinished = buf.hasRemaining();
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 1);
+
+            return GridUnsafe.getBoolean(heapArr, baseOff + pos);
+        }
+        else
+            return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] readByteArray() {
+        return readArray(BYTE_ARR_CREATOR, 0, GridUnsafe.BYTE_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short[] readShortArray() {
+        return readArray(SHORT_ARR_CREATOR, 1, GridUnsafe.SHORT_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int[] readIntArray() {
+        return readArray(INT_ARR_CREATOR, 2, GridUnsafe.INT_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long[] readLongArray() {
+        return readArray(LONG_ARR_CREATOR, 3, GridUnsafe.LONG_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public float[] readFloatArray() {
+        return readArray(FLOAT_ARR_CREATOR, 2, GridUnsafe.FLOAT_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public double[] readDoubleArray() {
+        return readArray(DOUBLE_ARR_CREATOR, 3, GridUnsafe.DOUBLE_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public char[] readCharArray() {
+        return readArray(CHAR_ARR_CREATOR, 1, GridUnsafe.CHAR_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean[] readBooleanArray() {
+        return readArray(BOOLEAN_ARR_CREATOR, 0, GridUnsafe.BOOLEAN_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String readString() {
+        byte[] arr = readByteArray();
+
+        return arr != null ? new String(arr) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public BitSet readBitSet() {
+        long[] arr = readLongArray();
+
+        return arr != null ? BitSet.valueOf(arr) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID readUuid() {
+        switch (uuidState) {
+            case 0:
+                boolean isNull = readBoolean();
+
+                if (!lastFinished || isNull)
+                    return null;
+
+                uuidState++;
+
+            case 1:
+                uuidMost = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState++;
+
+            case 2:
+                uuidLeast = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState = 0;
+        }
+
+        UUID val = new UUID(uuidMost, uuidLeast);
+
+        uuidMost = 0;
+        uuidLeast = 0;
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid readIgniteUuid() {
+        switch (uuidState) {
+            case 0:
+                boolean isNull = readBoolean();
+
+                if (!lastFinished || isNull)
+                    return null;
+
+                uuidState++;
+
+            case 1:
+                uuidMost = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState++;
+
+            case 2:
+                uuidLeast = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState++;
+
+            case 3:
+                uuidLocId = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState = 0;
+        }
+
+        IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), uuidLocId);
+
+        uuidMost = 0;
+        uuidLeast = 0;
+        uuidLocId = 0;
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T extends Message> T readMessage(MessageReader reader) {
+        if (!msgTypeDone) {
+            if (!buf.hasRemaining()) {
+                lastFinished = false;
+
+                return null;
+            }
+
+            byte type = readByte();
+
+            msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type);
+
+            msgTypeDone = true;
+        }
+
+        if (msg != null) {
+            try {
+                reader.beforeInnerMessageRead();
+
+                reader.setCurrentReadClass(msg.getClass());
+
+                lastFinished = msg.readFrom(buf, reader);
+            }
+            finally {
+                reader.afterInnerMessageRead(lastFinished);
+            }
+        }
+        else
+            lastFinished = true;
+
+        if (lastFinished) {
+            Message msg0 = msg;
+
+            msgTypeDone = false;
+            msg = null;
+
+            return (T)msg0;
+        }
+        else
+            return null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls,
+        MessageReader reader) {
+        if (readSize == -1) {
+            int size = readInt();
+
+            if (!lastFinished)
+                return null;
+
+            readSize = size;
+        }
+
+        if (readSize >= 0) {
+            if (objArr == null)
+                objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize];
+
+            for (int i = readItems; i < readSize; i++) {
+                Object item = read(itemType, reader);
+
+                if (!lastFinished)
+                    return null;
+
+                objArr[i] = item;
+
+                readItems++;
+            }
+        }
+
+        readSize = -1;
+        readItems = 0;
+        cur = null;
+
+        T[] objArr0 = (T[])objArr;
+
+        objArr = null;
+
+        return objArr0;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType,
+        MessageReader reader) {
+        if (readSize == -1) {
+            int size = readInt();
+
+            if (!lastFinished)
+                return null;
+
+            readSize = size;
+        }
+
+        if (readSize >= 0) {
+            if (col == null)
+                col = new ArrayList<>(readSize);
+
+            for (int i = readItems; i < readSize; i++) {
+                Object item = read(itemType, reader);
+
+                if (!lastFinished)
+                    return null;
+
+                col.add(item);
+
+                readItems++;
+            }
+        }
+
+        readSize = -1;
+        readItems = 0;
+        cur = null;
+
+        C col0 = (C)col;
+
+        col = null;
+
+        return col0;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
+        MessageCollectionItemType valType, boolean linked, MessageReader reader) {
+        if (readSize == -1) {
+            int size = readInt();
+
+            if (!lastFinished)
+                return null;
+
+            readSize = size;
+        }
+
+        if (readSize >= 0) {
+            if (map == null)
+                map = linked ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize);
+
+            for (int i = readItems; i < readSize; i++) {
+                if (!keyDone) {
+                    Object key = read(keyType, reader);
+
+                    if (!lastFinished)
+                        return null;
+
+                    mapCur = key;
+                    keyDone = true;
+                }
+
+                Object val = read(valType, reader);
+
+                if (!lastFinished)
+                    return null;
+
+                map.put(mapCur, val);
+
+                keyDone = false;
+
+                readItems++;
+            }
+        }
+
+        readSize = -1;
+        readItems = 0;
+        mapCur = null;
+
+        M map0 = (M)map;
+
+        map = null;
+
+        return map0;
+    }
+
+    /**
+     * @param arr Array.
+     * @param off Offset.
+     * @param len Length.
+     * @param bytes Length in bytes.
+     * @return Whether array was fully written.
+     */
+    private boolean writeArray(Object arr, long off, int len, int bytes) {
+        assert arr != null;
+        assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
+        assert off > 0;
+        assert len >= 0;
+        assert bytes >= 0;
+        assert bytes >= arrOff;
+
+        if (arrOff == -1) {
+            writeInt(len);
+
+            if (!lastFinished)
+                return false;
+
+            arrOff = 0;
+        }
+
+        int toWrite = bytes - arrOff;
+        int pos = buf.position();
+        int remaining = buf.remaining();
+
+        if (toWrite <= remaining) {
+            if (toWrite > 0) {
+                GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
+
+                buf.position(pos + toWrite);
+            }
+
+            arrOff = -1;
+
+            return true;
+        }
+        else {
+            if (remaining > 0) {
+                GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
+
+                buf.position(pos + remaining);
+
+                arrOff += remaining;
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * @param creator Array creator.
+     * @param lenShift Array length shift size.
+     * @param off Base offset.
+     * @return Array or special value if it was not fully read.
+     */
+    @SuppressWarnings("unchecked")
+    private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) {
+        assert creator != null;
+
+        if (tmpArr == null) {
+            int len = readInt();
+
+            if (!lastFinished)
+                return null;
+
+            switch (len) {
+                case -1:
+                    lastFinished = true;
+
+                    return null;
+
+                case 0:
+                    lastFinished = true;
+
+                    return creator.create(0);
+
+                default:
+                    tmpArr = creator.create(len);
+                    tmpArrBytes = len << lenShift;
+            }
+        }
+
+        int toRead = tmpArrBytes - tmpArrOff;
+        int remaining = buf.remaining();
+        int pos = buf.position();
+
+        lastFinished = toRead <= remaining;
+
+        if (lastFinished) {
+            GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
+
+            buf.position(pos + toRead);
+
+            T arr = (T)tmpArr;
+
+            tmpArr = null;
+            tmpArrBytes = 0;
+            tmpArrOff = 0;
+
+            return arr;
+        }
+        else {
+            GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
+
+            buf.position(pos + remaining);
+
+            tmpArrOff += remaining;
+
+            return null;
+        }
+    }
+
+    /**
+     * @param type Type.
+     * @param val Value.
+     * @param writer Writer.
+     */
+    private void write(MessageCollectionItemType type, Object val, MessageWriter writer) {
+        switch (type) {
+            case BYTE:
+                writeByte((Byte)val);
+
+                break;
+
+            case SHORT:
+                writeShort((Short)val);
+
+                break;
+
+            case INT:
+                writeInt((Integer)val);
+
+                break;
+
+            case LONG:
+                writeLong((Long)val);
+
+                break;
+
+            case FLOAT:
+                writeFloat((Float)val);
+
+                break;
+
+            case DOUBLE:
+                writeDouble((Double)val);
+
+                break;
+
+            case CHAR:
+                writeChar((Character)val);
+
+                break;
+
+            case BOOLEAN:
+                writeBoolean((Boolean)val);
+
+                break;
+
+            case BYTE_ARR:
+                writeByteArray((byte[])val);
+
+                break;
+
+            case SHORT_ARR:
+                writeShortArray((short[])val);
+
+                break;
+
+            case INT_ARR:
+                writeIntArray((int[])val);
+
+                break;
+
+            case LONG_ARR:
+                writeLongArray((long[])val);
+
+                break;
+
+            case FLOAT_ARR:
+                writeFloatArray((float[])val);
+
+                break;
+
+            case DOUBLE_ARR:
+                writeDoubleArray((double[])val);
+
+                break;
+
+            case CHAR_ARR:
+                writeCharArray((char[])val);
+
+                break;
+
+            case BOOLEAN_ARR:
+                writeBooleanArray((boolean[])val);
+
+                break;
+
+            case STRING:
+                writeString((String)val);
+
+                break;
+
+            case BIT_SET:
+                writeBitSet((BitSet)val);
+
+                break;
+
+            case UUID:
+                writeUuid((UUID)val);
+
+                break;
+
+            case IGNITE_UUID:
+                writeIgniteUuid((IgniteUuid)val);
+
+                break;
+
+            case MSG:
+                try {
+                    if (val != null)
+                        writer.beforeInnerMessageWrite();
+
+                    writeMessage((Message)val, writer);
+                }
+                finally {
+                    if (val != null)
+                        writer.afterInnerMessageWrite(lastFinished);
+                }
+
+                break;
+
+            default:
+                throw new IllegalArgumentException("Unknown type: " + type);
+        }
+    }
+
+    /**
+     * @param type Type.
+     * @param reader Reader.
+     * @return Value.
+     */
+    private Object read(MessageCollectionItemType type, MessageReader reader) {
+        switch (type) {
+            case BYTE:
+                return readByte();
+
+            case SHORT:
+                return readShort();
+
+            case INT:
+                return readInt();
+
+            case LONG:
+                return readLong();
+
+            case FLOAT:
+                return readFloat();
+
+            case DOUBLE:
+                return readDouble();
+
+            case CHAR:
+                return readChar();
+
+            case BOOLEAN:
+                return readBoolean();
+
+            case BYTE_ARR:
+                return readByteArray();
+
+            case SHORT_ARR:
+                return readShortArray();
+
+            case INT_ARR:
+                return readIntArray();
+
+            case LONG_ARR:
+                return readLongArray();
+
+            case FLOAT_ARR:
+                return readFloatArray();
+
+            case DOUBLE_ARR:
+                return readDoubleArray();
+
+            case CHAR_ARR:
+                return readCharArray();
+
+            case BOOLEAN_ARR:
+                return readBooleanArray();
+
+            case STRING:
+                return readString();
+
+            case BIT_SET:
+                return readBitSet();
+
+            case UUID:
+                return readUuid();
+
+            case IGNITE_UUID:
+                return readIgniteUuid();
+
+            case MSG:
+                return readMessage(reader);
+
+            default:
+                throw new IllegalArgumentException("Unknown type: " + type);
+        }
+    }
+
+    /**
+     * Array creator.
+     */
+    private interface ArrayCreator<T> {
+        /**
+         * @param len Array length or {@code -1} if array was not fully read.
+         * @return New array.
+         */
+        public T create(int len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 42f8dae..d4e2a43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -111,7 +111,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     public static final String DIRECT_PROTO_VER_ATTR = "comm.direct.proto.ver";
 
     /** Direct protocol version. */
-    public static final byte DIRECT_PROTO_VER = 2;
+    public static final byte DIRECT_PROTO_VER = 3;
 
     /** Listeners by topic. */
     private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 845e204..3a7bc8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -103,9 +103,6 @@ import static org.jsr166.ConcurrentLinkedDeque8.Node;
  * Cache eviction manager.
  */
 public class GridCacheEvictionManager extends GridCacheManagerAdapter {
-    /** Unsafe instance. */
-    private static final sun.misc.Unsafe unsafe = GridUnsafe.unsafe();
-
     /** Attribute name used to queue node in entry metadata. */
     private static final int META_KEY = GridMetadataAwareAdapter.EntryKey.CACHE_EVICTION_MANAGER_KEY.key();
 
@@ -985,7 +982,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                     continue;
 
                 // Lock entry.
-                unsafe.monitorEnter(entry);
+                GridUnsafe.monitorEnter(entry);
 
                 locked.add(entry);
 
@@ -1028,7 +1025,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
             for (ListIterator<GridCacheEntryEx> it = locked.listIterator(locked.size()); it.hasPrevious();) {
                 GridCacheEntryEx e = it.previous();
 
-                unsafe.monitorExit(e);
+                GridUnsafe.monitorExit(e);
             }
 
             // Remove entries and fire events outside the locks.

http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java
index ea036af..82e115c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
-import sun.misc.Unsafe;
 
 /**
  * GridCacheSwapEntry over offheap pointer.
@@ -41,9 +40,6 @@ import sun.misc.Unsafe;
  */
 public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
     /** */
-    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
-    /** */
     private final long ptr;
 
     /** */
@@ -70,17 +66,17 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
 
         long readPtr = ptr + GridCacheSwapEntryImpl.VERSION_OFFSET;
 
-        boolean verEx = UNSAFE.getByte(readPtr++) != 0;
+        boolean verEx = GridUnsafe.getByte(readPtr++) != 0;
 
         ver = U.readVersion(readPtr, verEx);
 
         readPtr += verEx ? GridCacheSwapEntryImpl.VERSION_EX_SIZE : GridCacheSwapEntryImpl.VERSION_SIZE;
 
-        type = UNSAFE.getByte(readPtr + 4);
+        type = GridUnsafe.getByte(readPtr + 4);
 
         valPtr = readPtr;
 
-        assert (ptr + size) > (UNSAFE.getInt(valPtr) + valPtr + 5);
+        assert (ptr + size) > (GridUnsafe.getInt(valPtr) + valPtr + 5);
     }
 
     /**
@@ -94,11 +90,11 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
 
         ptr += GridCacheSwapEntryImpl.VERSION_OFFSET; // Skip ttl, expire time.
 
-        boolean verEx = UNSAFE.getByte(ptr++) != 0;
+        boolean verEx = GridUnsafe.getByte(ptr++) != 0;
 
         ptr += verEx ? GridCacheSwapEntryImpl.VERSION_EX_SIZE : GridCacheSwapEntryImpl.VERSION_SIZE;
 
-        assert (ptr + size) > (UNSAFE.getInt(ptr) + ptr + 5);
+        assert (ptr + size) > (GridUnsafe.getInt(ptr) + ptr + 5);
 
         return ptr;
     }
@@ -108,7 +104,7 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
      * @return TTL.
      */
     public static long timeToLive(long ptr) {
-        return UNSAFE.getLong(ptr);
+        return GridUnsafe.getLong(ptr);
     }
 
     /**
@@ -116,7 +112,7 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
      * @return Expire time.
      */
     public static long expireTime(long ptr) {
-        return UNSAFE.getLong(ptr + GridCacheSwapEntryImpl.EXPIRE_TIME_OFFSET);
+        return GridUnsafe.getLong(ptr + GridCacheSwapEntryImpl.EXPIRE_TIME_OFFSET);
     }
 
     /**
@@ -126,7 +122,7 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
     public static GridCacheVersion version(long ptr) {
         long addr = ptr + GridCacheSwapEntryImpl.VERSION_OFFSET;
 
-        boolean verEx = UNSAFE.getByte(addr) != 0;
+        boolean verEx = GridUnsafe.getByte(addr) != 0;
 
         addr++;
 
@@ -165,12 +161,12 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
 
     /** {@inheritDoc} */
     @Override public long ttl() {
-        return UNSAFE.getLong(ptr);
+        return GridUnsafe.getLong(ptr);
     }
 
     /** {@inheritDoc} */
     @Override public long expireTime() {
-        return UNSAFE.getLong(ptr + GridCacheSwapEntryImpl.EXPIRE_TIME_OFFSET);
+        return GridUnsafe.getLong(ptr + GridCacheSwapEntryImpl.EXPIRE_TIME_OFFSET);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
index 6b1266f..4a69822 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
@@ -26,19 +26,12 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
-import sun.misc.Unsafe;
 
 /**
  * Swap entry.
  */
 public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
     /** */
-    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
-    /** */
-    private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
-    /** */
     static final int EXPIRE_TIME_OFFSET = 8;
 
     /** */
@@ -108,7 +101,7 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
      * @return TTL.
      */
     public static long timeToLive(byte[] bytes) {
-        return UNSAFE.getLong(bytes, BYTE_ARR_OFF);
+        return GridUnsafe.getLongAligned(bytes, GridUnsafe.BYTE_ARR_OFF);
     }
 
     /**
@@ -116,7 +109,7 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
      * @return Expire time.
      */
     public static long expireTime(byte[] bytes) {
-        return UNSAFE.getLong(bytes, BYTE_ARR_OFF + EXPIRE_TIME_OFFSET);
+        return GridUnsafe.getLongAligned(bytes, GridUnsafe.BYTE_ARR_OFF + EXPIRE_TIME_OFFSET);
     }
 
     /**
@@ -124,9 +117,9 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
      * @return Version.
      */
     public static GridCacheVersion version(byte[] bytes) {
-        long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+        long off = GridUnsafe.BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
 
-        boolean verEx = UNSAFE.getByte(bytes, off++) != 0;
+        boolean verEx = GridUnsafe.getByte(bytes, off++) != 0;
 
         return U.readVersion(bytes, off, verEx);
     }
@@ -136,21 +129,21 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
      * @return Value if value is byte array, otherwise {@code null}.
      */
     @Nullable public static IgniteBiTuple<byte[], Byte> getValue(byte[] bytes) {
-        long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+        long off = GridUnsafe.BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
 
-        boolean verEx = UNSAFE.getByte(bytes, off++) != 0;
+        boolean verEx = GridUnsafe.getByte(bytes, off++) != 0;
 
         off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
 
-        int arrLen = UNSAFE.getInt(bytes, off);
+        int arrLen = GridUnsafe.getIntAligned(bytes, off);
 
         off += 4;
 
-        byte type = UNSAFE.getByte(bytes, off++);
+        byte type = GridUnsafe.getByte(bytes, off++);
 
         byte[] valBytes = new byte[arrLen];
 
-        UNSAFE.copyMemory(bytes, off, valBytes, BYTE_ARR_OFF, arrLen);
+        GridUnsafe.copyMemory(bytes, off, valBytes, GridUnsafe.BYTE_ARR_OFF, arrLen);
 
         return new IgniteBiTuple<>(valBytes, type);
     }
@@ -235,25 +228,25 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
 
         byte[] arr = new byte[size];
 
-        long off = BYTE_ARR_OFF;
+        long off = GridUnsafe.BYTE_ARR_OFF;
 
-        UNSAFE.putLong(arr, off, ttl);
+        GridUnsafe.putLongAligned(arr, off, ttl);
 
         off += 8;
 
-        UNSAFE.putLong(arr, off, expireTime);
+        GridUnsafe.putLongAligned(arr, off, expireTime);
 
         off += 8;
 
         off = U.writeVersion(arr, off, ver);
 
-        UNSAFE.putInt(arr, off, len);
+        GridUnsafe.putIntAligned(arr, off, len);
 
         off += 4;
 
-        UNSAFE.putByte(arr, off++, type);
+        GridUnsafe.putByte(arr, off++, type);
 
-        UNSAFE.copyMemory(valBytes.array(), BYTE_ARR_OFF, arr, off, len);
+        GridUnsafe.copyMemory(valBytes.array(), GridUnsafe.BYTE_ARR_OFF, arr, off, len);
 
         off += len;
 
@@ -271,21 +264,21 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
      */
     public static GridCacheSwapEntryImpl unmarshal(byte[] arr, boolean valOnly) {
         if (valOnly) {
-            long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+            long off = GridUnsafe.BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
 
-            boolean verEx = UNSAFE.getByte(arr, off++) != 0;
+            boolean verEx = GridUnsafe.getByte(arr, off++) != 0;
 
             off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
 
-            int arrLen = UNSAFE.getInt(arr, off);
+            int arrLen = GridUnsafe.getIntAligned(arr, off);
 
             off += 4;
 
-            byte type = UNSAFE.getByte(arr, off++);
+            byte type = GridUnsafe.getByte(arr, off++);
 
             byte[] valBytes = new byte[arrLen];
 
-            UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen);
+            GridUnsafe.copyMemory(arr, off, valBytes, GridUnsafe.BYTE_ARR_OFF, arrLen);
 
             return new GridCacheSwapEntryImpl(ByteBuffer.wrap(valBytes),
                 type,
@@ -296,31 +289,31 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
                 null);
         }
 
-        long off = BYTE_ARR_OFF;
+        long off = GridUnsafe.BYTE_ARR_OFF;
 
-        long ttl = UNSAFE.getLong(arr, off);
+        long ttl = GridUnsafe.getLongAligned(arr, off);
 
         off += 8;
 
-        long expireTime = UNSAFE.getLong(arr, off);
+        long expireTime = GridUnsafe.getLongAligned(arr, off);
 
         off += 8;
 
-        boolean verEx = UNSAFE.getBoolean(arr, off++);
+        boolean verEx = GridUnsafe.getBoolean(arr, off++);
 
         GridCacheVersion ver = U.readVersion(arr, off, verEx);
 
         off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
 
-        int arrLen = UNSAFE.getInt(arr, off);
+        int arrLen = GridUnsafe.getIntAligned(arr, off);
 
         off += 4;
 
-        byte type = UNSAFE.getByte(arr, off++);
+        byte type = GridUnsafe.getByte(arr, off++);
 
         byte[] valBytes = new byte[arrLen];
 
-        UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen);
+        GridUnsafe.copyMemory(arr, off, valBytes, GridUnsafe.BYTE_ARR_OFF, arrLen);
 
         off += arrLen;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index c9d6dad..9d95c2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -48,7 +48,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.binary.BinaryMetadata;
@@ -56,13 +56,14 @@ import org.apache.ignite.internal.binary.BinaryMetadataHandler;
 import org.apache.ignite.internal.binary.BinaryObjectEx;
 import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl;
+import org.apache.ignite.internal.binary.BinaryPrimitives;
 import org.apache.ignite.internal.binary.BinaryTypeImpl;
-import org.apache.ignite.internal.binary.GridBinaryMarshaller;
-import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
 import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter;
@@ -95,7 +96,6 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.marshaller.Marshaller;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import sun.misc.Unsafe;
 
 /**
  * Binary processor implementation.
@@ -103,9 +103,6 @@ import sun.misc.Unsafe;
 public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorImpl implements
     CacheObjectBinaryProcessor {
     /** */
-    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
-    /** */
     private final CountDownLatch startLatch = new CountDownLatch(1);
 
     /** */
@@ -356,11 +353,11 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     public Object unmarshal(long ptr, boolean forceHeap) throws BinaryObjectException {
         assert ptr > 0 : ptr;
 
-        int size = UNSAFE.getInt(ptr);
+        int size = GridUnsafe.getInt(ptr);
 
         ptr += 4;
 
-        byte type = UNSAFE.getByte(ptr++);
+        byte type = GridUnsafe.getByte(ptr++);
 
         if (type != CacheObject.TYPE_BYTE_ARR) {
             assert size > 0 : size;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 81fd5d6..966c10a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -137,9 +137,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT =
         Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
 
-    /** Unsafe instance. */
-    private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe();
-
     /** Update reply closure. */
     private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
 
@@ -2364,10 +2361,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 try {
                     GridDhtCacheEntry entry = entryExx(key, topVer);
 
-                    UNSAFE.monitorEnter(entry);
+                    GridUnsafe.monitorEnter(entry);
 
                     if (entry.obsolete())
-                        UNSAFE.monitorExit(entry);
+                        GridUnsafe.monitorExit(entry);
                     else
                         return Collections.singletonList(entry);
                 }
@@ -2407,13 +2404,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     if (entry == null)
                         continue;
 
-                    UNSAFE.monitorEnter(entry);
+                    GridUnsafe.monitorEnter(entry);
 
                     if (entry.obsolete()) {
                         // Unlock all locked.
                         for (int j = 0; j <= i; j++) {
                             if (locked.get(j) != null)
-                                UNSAFE.monitorExit(locked.get(j));
+                                GridUnsafe.monitorExit(locked.get(j));
                         }
 
                         // Clear entries.
@@ -2462,7 +2459,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             // That's why releasing locks in the finally block..
             for (GridCacheMapEntry entry : locked) {
                 if (entry != null)
-                    UNSAFE.monitorExit(entry);
+                    GridUnsafe.monitorExit(entry);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 6130ead..c4877f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -89,9 +89,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Unsafe instance. */
-    private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe();
-
     /** */
     private GridCachePreloader preldr;
 
@@ -1506,12 +1503,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
             for (int i = 0; i < locked.size(); i++) {
                 GridCacheEntryEx entry = locked.get(i);
 
-                UNSAFE.monitorEnter(entry);
+                GridUnsafe.monitorEnter(entry);
 
                 if (entry.obsolete()) {
                     // Unlock all locked.
                     for (int j = 0; j <= i; j++)
-                        UNSAFE.monitorExit(locked.get(j));
+                        GridUnsafe.monitorExit(locked.get(j));
 
                     // Clear entries.
                     locked.clear();
@@ -1542,7 +1539,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
      */
     private void unlockEntries(Iterable<GridCacheEntryEx> locked) {
         for (GridCacheEntryEx entry : locked)
-            UNSAFE.monitorExit(entry);
+            GridUnsafe.monitorExit(entry);
 
         AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 5b764b6..54dd69e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -51,9 +51,6 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
  *
  */
 public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter implements IgniteCacheObjectProcessor {
-    /** */
-    private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe();
-
     /** Immutable classes. */
     private static final Collection<Class<?>> IMMUTABLE_CLS = new HashSet<>();
 
@@ -138,9 +135,9 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
     {
         assert valPtr != 0;
 
-        int size = UNSAFE.getInt(valPtr);
+        int size = GridUnsafe.getInt(valPtr);
 
-        byte type = UNSAFE.getByte(valPtr + 4);
+        byte type = GridUnsafe.getByte(valPtr + 4);
 
         byte[] bytes = U.copyMemory(valPtr + 5, size);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
index 2f6ad5c..b16f42f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.memory;
 
-import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.UNSAFE;
+import org.apache.ignite.internal.util.GridUnsafe;
 
 /**
  * Interop output stream implementation working with BIG ENDIAN architecture.
@@ -46,7 +46,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
         long startPos = data + pos;
 
         for (short item : val) {
-            UNSAFE.putShort(startPos, Short.reverseBytes(item));
+            GridUnsafe.putShort(startPos, Short.reverseBytes(item));
 
             startPos += 2;
         }
@@ -68,7 +68,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
         long startPos = data + pos;
 
         for (char item : val) {
-            UNSAFE.putChar(startPos, Character.reverseBytes(item));
+            GridUnsafe.putChar(startPos, Character.reverseBytes(item));
 
             startPos += 2;
         }
@@ -90,7 +90,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
         long startPos = data + pos;
 
         for (int item : val) {
-            UNSAFE.putInt(startPos, Integer.reverseBytes(item));
+            GridUnsafe.putInt(startPos, Integer.reverseBytes(item));
 
             startPos += 4;
         }
@@ -117,7 +117,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
         long startPos = data + pos;
 
         for (float item : val) {
-            UNSAFE.putInt(startPos, Integer.reverseBytes(Float.floatToIntBits(item)));
+            GridUnsafe.putInt(startPos, Integer.reverseBytes(Float.floatToIntBits(item)));
 
             startPos += 4;
         }
@@ -139,7 +139,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
         long startPos = data + pos;
 
         for (long item : val) {
-            UNSAFE.putLong(startPos, Long.reverseBytes(item));
+            GridUnsafe.putLong(startPos, Long.reverseBytes(item));
 
             startPos += 8;
         }
@@ -156,7 +156,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
         long startPos = data + pos;
 
         for (double item : val) {
-            UNSAFE.putLong(startPos, Long.reverseBytes(Double.doubleToLongBits(item)));
+            GridUnsafe.putLong(startPos, Long.reverseBytes(Double.doubleToLongBits(item)));
 
             startPos += 8;
         }