You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/01/20 14:19:27 UTC
[3/4] ignite git commit: ignite-2080 Data alignment issues with Unsafe
http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
index 1a4c4bb..444e367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
@@ -38,7 +38,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import sun.misc.Unsafe;
import sun.nio.ch.DirectBuffer;
/**
@@ -46,33 +45,6 @@ import sun.nio.ch.DirectBuffer;
*/
public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/** */
- private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
- /** */
- private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
- /** */
- private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
-
- /** */
- private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
-
- /** */
- private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
-
- /** */
- private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
-
- /** */
- private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
-
- /** */
- private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
-
- /** */
- private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
-
- /** */
private static final byte[] BYTE_ARR_EMPTY = new byte[0];
/** */
@@ -322,7 +294,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
this.buf = buf;
heapArr = buf.isDirect() ? null : buf.array();
- baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF;
+ baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : GridUnsafe.BYTE_ARR_OFF;
}
}
@@ -343,7 +315,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
if (lastFinished) {
int pos = buf.position();
- UNSAFE.putByte(heapArr, baseOff + pos, val);
+ GridUnsafe.putByte(heapArr, baseOff + pos, val);
buf.position(pos + 1);
}
@@ -356,7 +328,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
if (lastFinished) {
int pos = buf.position();
- UNSAFE.putShort(heapArr, baseOff + pos, val);
+ GridUnsafe.putShortAligned(heapArr, baseOff + pos, val);
buf.position(pos + 2);
}
@@ -377,12 +349,12 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
while ((val & 0xFFFF_FF80) != 0) {
byte b = (byte)(val | 0x80);
- UNSAFE.putByte(heapArr, baseOff + pos++, b);
+ GridUnsafe.putByte(heapArr, baseOff + pos++, b);
val >>>= 7;
}
- UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val);
+ GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
buf.position(pos);
}
@@ -403,12 +375,12 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
byte b = (byte)(val | 0x80);
- UNSAFE.putByte(heapArr, baseOff + pos++, b);
+ GridUnsafe.putByte(heapArr, baseOff + pos++, b);
val >>>= 7;
}
- UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val);
+ GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
buf.position(pos);
}
@@ -421,7 +393,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
if (lastFinished) {
int pos = buf.position();
- UNSAFE.putFloat(heapArr, baseOff + pos, val);
+ GridUnsafe.putFloatAligned(heapArr, baseOff + pos, val);
buf.position(pos + 4);
}
@@ -434,7 +406,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
if (lastFinished) {
int pos = buf.position();
- UNSAFE.putDouble(heapArr, baseOff + pos, val);
+ GridUnsafe.putDoubleAligned(heapArr, baseOff + pos, val);
buf.position(pos + 8);
}
@@ -447,7 +419,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
if (lastFinished) {
int pos = buf.position();
- UNSAFE.putChar(heapArr, baseOff + pos, val);
+ GridUnsafe.putCharAligned(heapArr, baseOff + pos, val);
buf.position(pos + 2);
}
@@ -460,7 +432,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
if (lastFinished) {
int pos = buf.position();
- UNSAFE.putBoolean(heapArr, baseOff + pos, val);
+ GridUnsafe.putBoolean(heapArr, baseOff + pos, val);
buf.position(pos + 1);
}
@@ -469,7 +441,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override public void writeByteArray(byte[] val) {
if (val != null)
- lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length);
+ lastFinished = writeArray(val, GridUnsafe.BYTE_ARR_OFF, val.length, val.length);
else
writeInt(-1);
}
@@ -477,7 +449,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override public void writeByteArray(byte[] val, long off, int len) {
if (val != null)
- lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len);
+ lastFinished = writeArray(val, GridUnsafe.BYTE_ARR_OFF + off, len, len);
else
writeInt(-1);
}
@@ -485,7 +457,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override public void writeShortArray(short[] val) {
if (val != null)
- lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1);
+ lastFinished = writeArray(val, GridUnsafe.SHORT_ARR_OFF, val.length, val.length << 1);
else
writeInt(-1);
}
@@ -493,7 +465,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override public void writeIntArray(int[] val) {
if (val != null)
- lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2);
+ lastFinished = writeArray(val, GridUnsafe.INT_ARR_OFF, val.length, val.length << 2);
else
writeInt(-1);
}
@@ -501,7 +473,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override public void writeLongArray(long[] val) {
if (val != null)
- lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3);
+ lastFinished = writeArray(val, GridUnsafe.LONG_ARR_OFF, val.length, val.length << 3);
else
writeInt(-1);
}
@@ -509,7 +481,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override public void writeFloatArray(float[] val) {
if (val != null)
- lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2);
+ lastFinished = writeArray(val, GridUnsafe.FLOAT_ARR_OFF, val.length, val.length << 2);
else
writeInt(-1);
}
@@ -517,7 +489,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override public void writeDoubleArray(double[] val) {
if (val != null)
- lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3);
+ lastFinished = writeArray(val, GridUnsafe.DOUBLE_ARR_OFF, val.length, val.length << 3);
else
writeInt(-1);
}
@@ -525,7 +497,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override public void writeCharArray(char[] val) {
if (val != null)
- lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1);
+ lastFinished = writeArray(val, GridUnsafe.CHAR_ARR_OFF, val.length, val.length << 1);
else
writeInt(-1);
}
@@ -533,7 +505,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override public void writeBooleanArray(boolean[] val) {
if (val != null)
- lastFinished = writeArray(val, BOOLEAN_ARR_OFF, val.length, val.length);
+ lastFinished = writeArray(val, GridUnsafe.BOOLEAN_ARR_OFF, val.length, val.length);
else
writeInt(-1);
}
@@ -793,7 +765,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
buf.position(pos + 1);
- return UNSAFE.getByte(heapArr, baseOff + pos);
+ return GridUnsafe.getByte(heapArr, baseOff + pos);
}
else
return 0;
@@ -808,7 +780,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
buf.position(pos + 2);
- return UNSAFE.getShort(heapArr, baseOff + pos);
+ return GridUnsafe.getShortAligned(heapArr, baseOff + pos);
}
else
return 0;
@@ -823,7 +795,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
while (buf.hasRemaining()) {
int pos = buf.position();
- byte b = UNSAFE.getByte(heapArr, baseOff + pos);
+ byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
buf.position(pos + 1);
@@ -860,7 +832,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
while (buf.hasRemaining()) {
int pos = buf.position();
- byte b = UNSAFE.getByte(heapArr, baseOff + pos);
+ byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
buf.position(pos + 1);
@@ -897,7 +869,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
buf.position(pos + 4);
- return UNSAFE.getFloat(heapArr, baseOff + pos);
+ return GridUnsafe.getFloatAligned(heapArr, baseOff + pos);
}
else
return 0;
@@ -912,7 +884,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
buf.position(pos + 8);
- return UNSAFE.getDouble(heapArr, baseOff + pos);
+ return GridUnsafe.getDoubleAligned(heapArr, baseOff + pos);
}
else
return 0;
@@ -927,7 +899,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
buf.position(pos + 2);
- return UNSAFE.getChar(heapArr, baseOff + pos);
+ return GridUnsafe.getCharAligned(heapArr, baseOff + pos);
}
else
return 0;
@@ -942,7 +914,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
buf.position(pos + 1);
- return UNSAFE.getBoolean(heapArr, baseOff + pos);
+ return GridUnsafe.getBoolean(heapArr, baseOff + pos);
}
else
return false;
@@ -950,42 +922,42 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/** {@inheritDoc} */
@Override public byte[] readByteArray() {
- return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF);
+ return readArray(BYTE_ARR_CREATOR, 0, GridUnsafe.BYTE_ARR_OFF);
}
/** {@inheritDoc} */
@Override public short[] readShortArray() {
- return readArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF);
+ return readArray(SHORT_ARR_CREATOR, 1, GridUnsafe.SHORT_ARR_OFF);
}
/** {@inheritDoc} */
@Override public int[] readIntArray() {
- return readArray(INT_ARR_CREATOR, 2, INT_ARR_OFF);
+ return readArray(INT_ARR_CREATOR, 2, GridUnsafe.INT_ARR_OFF);
}
/** {@inheritDoc} */
@Override public long[] readLongArray() {
- return readArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF);
+ return readArray(LONG_ARR_CREATOR, 3, GridUnsafe.LONG_ARR_OFF);
}
/** {@inheritDoc} */
@Override public float[] readFloatArray() {
- return readArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF);
+ return readArray(FLOAT_ARR_CREATOR, 2, GridUnsafe.FLOAT_ARR_OFF);
}
/** {@inheritDoc} */
@Override public double[] readDoubleArray() {
- return readArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF);
+ return readArray(DOUBLE_ARR_CREATOR, 3, GridUnsafe.DOUBLE_ARR_OFF);
}
/** {@inheritDoc} */
@Override public char[] readCharArray() {
- return readArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF);
+ return readArray(CHAR_ARR_CREATOR, 1, GridUnsafe.CHAR_ARR_OFF);
}
/** {@inheritDoc} */
@Override public boolean[] readBooleanArray() {
- return readArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF);
+ return readArray(BOOLEAN_ARR_CREATOR, 0, GridUnsafe.BOOLEAN_ARR_OFF);
}
/** {@inheritDoc} */
@@ -1289,7 +1261,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
if (toWrite <= remaining) {
if (toWrite > 0) {
- UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
+ GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
buf.position(pos + toWrite);
}
@@ -1300,7 +1272,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
}
else {
if (remaining > 0) {
- UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
+ GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
buf.position(pos + remaining);
@@ -1351,7 +1323,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
lastFinished = toRead <= remaining;
if (lastFinished) {
- UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
+ GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
buf.position(pos + toRead);
@@ -1364,7 +1336,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
return arr;
}
else {
- UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
+ GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
buf.position(pos + remaining);
@@ -1583,7 +1555,7 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
/**
* Array creator.
*/
- private static interface ArrayCreator<T> {
+ private interface ArrayCreator<T> {
/**
* @param len Array length or {@code -1} if array was not fully read.
* @return New array.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v3/DirectByteBufferStreamImplV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v3/DirectByteBufferStreamImplV3.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v3/DirectByteBufferStreamImplV3.java
new file mode 100644
index 0000000..5e68408
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v3/DirectByteBufferStreamImplV3.java
@@ -0,0 +1,1595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct.stream.v3;
+
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.UUID;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import sun.nio.ch.DirectBuffer;
+
+/**
+ * Direct marshalling I/O stream (version 3).
+ */
+public class DirectByteBufferStreamImplV3 implements DirectByteBufferStream {
+ /** Whether little endian is set. */
+ private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
+
+ /** */
+ private static final byte[] BYTE_ARR_EMPTY = new byte[0];
+
+ /** */
+ private static final short[] SHORT_ARR_EMPTY = new short[0];
+
+ /** */
+ private static final int[] INT_ARR_EMPTY = U.EMPTY_INTS;
+
+ /** */
+ private static final long[] LONG_ARR_EMPTY = U.EMPTY_LONGS;
+
+ /** */
+ private static final float[] FLOAT_ARR_EMPTY = new float[0];
+
+ /** */
+ private static final double[] DOUBLE_ARR_EMPTY = new double[0];
+
+ /** */
+ private static final char[] CHAR_ARR_EMPTY = new char[0];
+
+ /** */
+ private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0];
+
+ /** */
+ private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() {
+ @Override public byte[] create(int len) {
+ assert len >= 0;
+
+ switch (len) {
+ case 0:
+ return BYTE_ARR_EMPTY;
+
+ default:
+ return new byte[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() {
+ @Override public short[] create(int len) {
+ assert len >= 0;
+
+ switch (len) {
+ case 0:
+ return SHORT_ARR_EMPTY;
+
+ default:
+ return new short[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() {
+ @Override public int[] create(int len) {
+ assert len >= 0;
+
+ switch (len) {
+ case 0:
+ return INT_ARR_EMPTY;
+
+ default:
+ return new int[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() {
+ @Override public long[] create(int len) {
+ assert len >= 0;
+
+ switch (len) {
+ case 0:
+ return LONG_ARR_EMPTY;
+
+ default:
+ return new long[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() {
+ @Override public float[] create(int len) {
+ assert len >= 0;
+
+ switch (len) {
+ case 0:
+ return FLOAT_ARR_EMPTY;
+
+ default:
+ return new float[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() {
+ @Override public double[] create(int len) {
+ assert len >= 0;
+
+ switch (len) {
+ case 0:
+ return DOUBLE_ARR_EMPTY;
+
+ default:
+ return new double[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() {
+ @Override public char[] create(int len) {
+ assert len >= 0;
+
+ switch (len) {
+ case 0:
+ return CHAR_ARR_EMPTY;
+
+ default:
+ return new char[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() {
+ @Override public boolean[] create(int len) {
+ assert len >= 0;
+
+ switch (len) {
+ case 0:
+ return BOOLEAN_ARR_EMPTY;
+
+ default:
+ return new boolean[len];
+ }
+ }
+ };
+
+ /** */
+ private static final Object NULL = new Object();
+
+ /** */
+ private final MessageFactory msgFactory;
+
+ /** */
+ private ByteBuffer buf;
+
+ /** */
+ private byte[] heapArr;
+
+ /** */
+ private long baseOff;
+
+ /** */
+ private int arrOff = -1;
+
+ /** */
+ private Object tmpArr;
+
+ /** */
+ private int tmpArrOff;
+
+ /** */
+ private int tmpArrBytes;
+
+ /** */
+ private boolean msgTypeDone;
+
+ /** */
+ private Message msg;
+
+ /** */
+ private Iterator<?> mapIt;
+
+ /** */
+ private Iterator<?> it;
+
+ /** */
+ private int arrPos = -1;
+
+ /** */
+ private Object arrCur = NULL;
+
+ /** */
+ private Object mapCur = NULL;
+
+ /** */
+ private Object cur = NULL;
+
+ /** */
+ private boolean keyDone;
+
+ /** */
+ private int readSize = -1;
+
+ /** */
+ private int readItems;
+
+ /** */
+ private Object[] objArr;
+
+ /** */
+ private Collection<Object> col;
+
+ /** */
+ private Map<Object, Object> map;
+
+ /** */
+ private long prim;
+
+ /** */
+ private int primShift;
+
+ /** */
+ private int uuidState;
+
+ /** */
+ private long uuidMost;
+
+ /** */
+ private long uuidLeast;
+
+ /** */
+ private long uuidLocId;
+
+ /** */
+ private boolean lastFinished;
+
+ /**
+ * @param msgFactory Message factory.
+ */
+ public DirectByteBufferStreamImplV3(MessageFactory msgFactory) {
+ this.msgFactory = msgFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setBuffer(ByteBuffer buf) {
+ assert buf != null;
+
+ if (this.buf != buf) {
+ this.buf = buf;
+
+ heapArr = buf.isDirect() ? null : buf.array();
+ baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : GridUnsafe.BYTE_ARR_OFF;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int remaining() {
+ return buf.remaining();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean lastFinished() {
+ return lastFinished;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByte(byte val) {
+ lastFinished = buf.remaining() >= 1;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ GridUnsafe.putByte(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 1);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShort(short val) {
+ lastFinished = buf.remaining() >= 2;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ if (BIG_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ GridUnsafe.putShortAligned(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 2);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int val) {
+ lastFinished = buf.remaining() >= 5;
+
+ if (lastFinished) {
+ if (val == Integer.MAX_VALUE)
+ val = Integer.MIN_VALUE;
+ else
+ val++;
+
+ int pos = buf.position();
+
+ while ((val & 0xFFFF_FF80) != 0) {
+ byte b = (byte)(val | 0x80);
+
+ GridUnsafe.putByte(heapArr, baseOff + pos++, b);
+
+ val >>>= 7;
+ }
+
+ GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
+
+ buf.position(pos);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLong(long val) {
+ lastFinished = buf.remaining() >= 10;
+
+ if (lastFinished) {
+ if (val == Long.MAX_VALUE)
+ val = Long.MIN_VALUE;
+ else
+ val++;
+
+ int pos = buf.position();
+
+ while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
+ byte b = (byte)(val | 0x80);
+
+ GridUnsafe.putByte(heapArr, baseOff + pos++, b);
+
+ val >>>= 7;
+ }
+
+ GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
+
+ buf.position(pos);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloat(float val) {
+ lastFinished = buf.remaining() >= 4;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ int v = Float.floatToIntBits(val);
+
+ if (BIG_ENDIAN)
+ v = Integer.reverseBytes(v);
+
+ GridUnsafe.putIntAligned(heapArr, baseOff + pos, v);
+
+ buf.position(pos + 4);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDouble(double val) {
+ lastFinished = buf.remaining() >= 8;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ long v = Double.doubleToLongBits(val);
+
+ if (BIG_ENDIAN)
+ v = Long.reverseBytes(v);
+
+ GridUnsafe.putLongAligned(heapArr, baseOff + pos, v);
+
+ buf.position(pos + 8);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeChar(char val) {
+ lastFinished = buf.remaining() >= 2;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ if (BIG_ENDIAN)
+ val = Character.reverseBytes(val);
+
+ GridUnsafe.putCharAligned(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 2);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBoolean(boolean val) {
+ lastFinished = buf.remaining() >= 1;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ GridUnsafe.putBoolean(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 1);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByteArray(byte[] val) {
+ if (val != null)
+ lastFinished = writeArray(val, GridUnsafe.BYTE_ARR_OFF, val.length, val.length);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByteArray(byte[] val, long off, int len) {
+ if (val != null)
+ lastFinished = writeArray(val, GridUnsafe.BYTE_ARR_OFF + off, len, len);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShortArray(short[] val) {
+ if (val != null)
+ lastFinished = writeArray(val, GridUnsafe.SHORT_ARR_OFF, val.length, val.length << 1);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeIntArray(int[] val) {
+ if (val != null)
+ lastFinished = writeArray(val, GridUnsafe.INT_ARR_OFF, val.length, val.length << 2);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLongArray(long[] val) {
+ if (val != null)
+ lastFinished = writeArray(val, GridUnsafe.LONG_ARR_OFF, val.length, val.length << 3);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloatArray(float[] val) {
+ if (val != null)
+ lastFinished = writeArray(val, GridUnsafe.FLOAT_ARR_OFF, val.length, val.length << 2);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDoubleArray(double[] val) {
+ if (val != null)
+ lastFinished = writeArray(val, GridUnsafe.DOUBLE_ARR_OFF, val.length, val.length << 3);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeCharArray(char[] val) {
+ if (val != null)
+ lastFinished = writeArray(val, GridUnsafe.CHAR_ARR_OFF, val.length, val.length << 1);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBooleanArray(boolean[] val) {
+ if (val != null)
+ lastFinished = writeArray(val, GridUnsafe.BOOLEAN_ARR_OFF, val.length, val.length);
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeString(String val) {
+ writeByteArray(val != null ? val.getBytes() : null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBitSet(BitSet val) {
+ writeLongArray(val != null ? val.toLongArray() : null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeUuid(UUID val) {
+ switch (uuidState) {
+ case 0:
+ writeBoolean(val == null);
+
+ if (!lastFinished || val == null)
+ return;
+
+ uuidState++;
+
+ case 1:
+ writeLong(val.getMostSignificantBits());
+
+ if (!lastFinished)
+ return;
+
+ uuidState++;
+
+ case 2:
+ writeLong(val.getLeastSignificantBits());
+
+ if (!lastFinished)
+ return;
+
+ uuidState = 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeIgniteUuid(IgniteUuid val) {
+ switch (uuidState) {
+ case 0:
+ writeBoolean(val == null);
+
+ if (!lastFinished || val == null)
+ return;
+
+ uuidState++;
+
+ case 1:
+ writeLong(val.globalId().getMostSignificantBits());
+
+ if (!lastFinished)
+ return;
+
+ uuidState++;
+
+ case 2:
+ writeLong(val.globalId().getLeastSignificantBits());
+
+ if (!lastFinished)
+ return;
+
+ uuidState++;
+
+ case 3:
+ writeLong(val.localId());
+
+ if (!lastFinished)
+ return;
+
+ uuidState = 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeMessage(Message msg, MessageWriter writer) {
+ if (msg != null) {
+ if (buf.hasRemaining()) {
+ try {
+ writer.beforeInnerMessageWrite();
+
+ lastFinished = msg.writeTo(buf, writer);
+ }
+ finally {
+ writer.afterInnerMessageWrite(lastFinished);
+ }
+ }
+ else
+ lastFinished = false;
+ }
+ else
+ writeByte(Byte.MIN_VALUE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType,
+ MessageWriter writer) {
+ if (arr != null) {
+ int len = arr.length;
+
+ if (arrPos == -1) {
+ writeInt(len);
+
+ if (!lastFinished)
+ return;
+
+ arrPos = 0;
+ }
+
+ while (arrPos < len || arrCur != NULL) {
+ if (arrCur == NULL)
+ arrCur = arr[arrPos++];
+
+ write(itemType, arrCur, writer);
+
+ if (!lastFinished)
+ return;
+
+ arrCur = NULL;
+ }
+
+ arrPos = -1;
+ }
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType,
+ MessageWriter writer) {
+ if (col != null) {
+ if (col instanceof List && col instanceof RandomAccess)
+ writeRandomAccessList((List<T>)col, itemType, writer);
+ else {
+ if (it == null) {
+ writeInt(col.size());
+
+ if (!lastFinished)
+ return;
+
+ it = col.iterator();
+ }
+
+ while (it.hasNext() || cur != NULL) {
+ if (cur == NULL)
+ cur = it.next();
+
+ write(itemType, cur, writer);
+
+ if (!lastFinished)
+ return;
+
+ cur = NULL;
+ }
+
+ it = null;
+ }
+ }
+ else
+ writeInt(-1);
+ }
+
+ /**
+ * @param list List.
+ * @param itemType Component type.
+ * @param writer Writer.
+ */
+ private <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType itemType, MessageWriter writer) {
+ assert list instanceof RandomAccess;
+
+ int size = list.size();
+
+ if (arrPos == -1) {
+ writeInt(size);
+
+ if (!lastFinished)
+ return;
+
+ arrPos = 0;
+ }
+
+ while (arrPos < size || arrCur != NULL) {
+ if (arrCur == NULL)
+ arrCur = list.get(arrPos++);
+
+ write(itemType, arrCur, writer);
+
+ if (!lastFinished)
+ return;
+
+ arrCur = NULL;
+ }
+
+ arrPos = -1;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType,
+ MessageCollectionItemType valType, MessageWriter writer) {
+ if (map != null) {
+ if (mapIt == null) {
+ writeInt(map.size());
+
+ if (!lastFinished)
+ return;
+
+ mapIt = map.entrySet().iterator();
+ }
+
+ while (mapIt.hasNext() || mapCur != NULL) {
+ Map.Entry<K, V> e;
+
+ if (mapCur == NULL)
+ mapCur = mapIt.next();
+
+ e = (Map.Entry<K, V>)mapCur;
+
+ if (!keyDone) {
+ write(keyType, e.getKey(), writer);
+
+ if (!lastFinished)
+ return;
+
+ keyDone = true;
+ }
+
+ write(valType, e.getValue(), writer);
+
+ if (!lastFinished)
+ return;
+
+ mapCur = NULL;
+ keyDone = false;
+ }
+
+ mapIt = null;
+ }
+ else
+ writeInt(-1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte readByte() {
+ lastFinished = buf.remaining() >= 1;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 1);
+
+ return GridUnsafe.getByte(heapArr, baseOff + pos);
+ }
+ else
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short readShort() {
+ lastFinished = buf.remaining() >= 2;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 2);
+
+ short val = GridUnsafe.getShortAligned(heapArr, baseOff + pos);
+
+ if (BIG_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ return val;
+ }
+ else
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt() {
+ lastFinished = false;
+
+ int val = 0;
+
+ while (buf.hasRemaining()) {
+ int pos = buf.position();
+
+ byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
+
+ buf.position(pos + 1);
+
+ prim |= ((long)b & 0x7F) << (7 * primShift);
+
+ if ((b & 0x80) == 0) {
+ lastFinished = true;
+
+ val = (int)prim;
+
+ if (val == Integer.MIN_VALUE)
+ val = Integer.MAX_VALUE;
+ else
+ val--;
+
+ prim = 0;
+ primShift = 0;
+
+ break;
+ }
+ else
+ primShift++;
+ }
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLong() {
+ lastFinished = false;
+
+ long val = 0;
+
+ while (buf.hasRemaining()) {
+ int pos = buf.position();
+
+ byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
+
+ buf.position(pos + 1);
+
+ prim |= ((long)b & 0x7F) << (7 * primShift);
+
+ if ((b & 0x80) == 0) {
+ lastFinished = true;
+
+ val = prim;
+
+ if (val == Long.MIN_VALUE)
+ val = Long.MAX_VALUE;
+ else
+ val--;
+
+ prim = 0;
+ primShift = 0;
+
+ break;
+ }
+ else
+ primShift++;
+ }
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float readFloat() {
+ lastFinished = buf.remaining() >= 4;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 4);
+
+ int val = GridUnsafe.getIntAligned(heapArr, baseOff + pos);
+
+ if (BIG_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ return Float.intBitsToFloat(val);
+ }
+ else
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public double readDouble() {
+ lastFinished = buf.remaining() >= 8;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 8);
+
+ long val = GridUnsafe.getLongAligned(heapArr, baseOff + pos);
+
+ if (BIG_ENDIAN)
+ val = Long.reverseBytes(val);
+
+ return Double.longBitsToDouble(val);
+ }
+ else
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char readChar() {
+ lastFinished = buf.remaining() >= 2;
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 2);
+
+ char val = GridUnsafe.getCharAligned(heapArr, baseOff + pos);
+
+ if (BIG_ENDIAN)
+ val = Character.reverseBytes(val);
+
+ return val;
+ }
+ else
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readBoolean() {
+ lastFinished = buf.hasRemaining();
+
+ if (lastFinished) {
+ int pos = buf.position();
+
+ buf.position(pos + 1);
+
+ return GridUnsafe.getBoolean(heapArr, baseOff + pos);
+ }
+ else
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] readByteArray() {
+ return readArray(BYTE_ARR_CREATOR, 0, GridUnsafe.BYTE_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short[] readShortArray() {
+ return readArray(SHORT_ARR_CREATOR, 1, GridUnsafe.SHORT_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] readIntArray() {
+ return readArray(INT_ARR_CREATOR, 2, GridUnsafe.INT_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long[] readLongArray() {
+ return readArray(LONG_ARR_CREATOR, 3, GridUnsafe.LONG_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public float[] readFloatArray() {
+ return readArray(FLOAT_ARR_CREATOR, 2, GridUnsafe.FLOAT_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double[] readDoubleArray() {
+ return readArray(DOUBLE_ARR_CREATOR, 3, GridUnsafe.DOUBLE_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public char[] readCharArray() {
+ return readArray(CHAR_ARR_CREATOR, 1, GridUnsafe.CHAR_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean[] readBooleanArray() {
+ return readArray(BOOLEAN_ARR_CREATOR, 0, GridUnsafe.BOOLEAN_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String readString() {
+ byte[] arr = readByteArray();
+
+ return arr != null ? new String(arr) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public BitSet readBitSet() {
+ long[] arr = readLongArray();
+
+ return arr != null ? BitSet.valueOf(arr) : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID readUuid() {
+ switch (uuidState) {
+ case 0:
+ boolean isNull = readBoolean();
+
+ if (!lastFinished || isNull)
+ return null;
+
+ uuidState++;
+
+ case 1:
+ uuidMost = readLong();
+
+ if (!lastFinished)
+ return null;
+
+ uuidState++;
+
+ case 2:
+ uuidLeast = readLong();
+
+ if (!lastFinished)
+ return null;
+
+ uuidState = 0;
+ }
+
+ UUID val = new UUID(uuidMost, uuidLeast);
+
+ uuidMost = 0;
+ uuidLeast = 0;
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid readIgniteUuid() {
+ switch (uuidState) {
+ case 0:
+ boolean isNull = readBoolean();
+
+ if (!lastFinished || isNull)
+ return null;
+
+ uuidState++;
+
+ case 1:
+ uuidMost = readLong();
+
+ if (!lastFinished)
+ return null;
+
+ uuidState++;
+
+ case 2:
+ uuidLeast = readLong();
+
+ if (!lastFinished)
+ return null;
+
+ uuidState++;
+
+ case 3:
+ uuidLocId = readLong();
+
+ if (!lastFinished)
+ return null;
+
+ uuidState = 0;
+ }
+
+ IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), uuidLocId);
+
+ uuidMost = 0;
+ uuidLeast = 0;
+ uuidLocId = 0;
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <T extends Message> T readMessage(MessageReader reader) {
+ if (!msgTypeDone) {
+ if (!buf.hasRemaining()) {
+ lastFinished = false;
+
+ return null;
+ }
+
+ byte type = readByte();
+
+ msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type);
+
+ msgTypeDone = true;
+ }
+
+ if (msg != null) {
+ try {
+ reader.beforeInnerMessageRead();
+
+ reader.setCurrentReadClass(msg.getClass());
+
+ lastFinished = msg.readFrom(buf, reader);
+ }
+ finally {
+ reader.afterInnerMessageRead(lastFinished);
+ }
+ }
+ else
+ lastFinished = true;
+
+ if (lastFinished) {
+ Message msg0 = msg;
+
+ msgTypeDone = false;
+ msg = null;
+
+ return (T)msg0;
+ }
+ else
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls,
+ MessageReader reader) {
+ if (readSize == -1) {
+ int size = readInt();
+
+ if (!lastFinished)
+ return null;
+
+ readSize = size;
+ }
+
+ if (readSize >= 0) {
+ if (objArr == null)
+ objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize];
+
+ for (int i = readItems; i < readSize; i++) {
+ Object item = read(itemType, reader);
+
+ if (!lastFinished)
+ return null;
+
+ objArr[i] = item;
+
+ readItems++;
+ }
+ }
+
+ readSize = -1;
+ readItems = 0;
+ cur = null;
+
+ T[] objArr0 = (T[])objArr;
+
+ objArr = null;
+
+ return objArr0;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType,
+ MessageReader reader) {
+ if (readSize == -1) {
+ int size = readInt();
+
+ if (!lastFinished)
+ return null;
+
+ readSize = size;
+ }
+
+ if (readSize >= 0) {
+ if (col == null)
+ col = new ArrayList<>(readSize);
+
+ for (int i = readItems; i < readSize; i++) {
+ Object item = read(itemType, reader);
+
+ if (!lastFinished)
+ return null;
+
+ col.add(item);
+
+ readItems++;
+ }
+ }
+
+ readSize = -1;
+ readItems = 0;
+ cur = null;
+
+ C col0 = (C)col;
+
+ col = null;
+
+ return col0;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
+ MessageCollectionItemType valType, boolean linked, MessageReader reader) {
+ if (readSize == -1) {
+ int size = readInt();
+
+ if (!lastFinished)
+ return null;
+
+ readSize = size;
+ }
+
+ if (readSize >= 0) {
+ if (map == null)
+ map = linked ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize);
+
+ for (int i = readItems; i < readSize; i++) {
+ if (!keyDone) {
+ Object key = read(keyType, reader);
+
+ if (!lastFinished)
+ return null;
+
+ mapCur = key;
+ keyDone = true;
+ }
+
+ Object val = read(valType, reader);
+
+ if (!lastFinished)
+ return null;
+
+ map.put(mapCur, val);
+
+ keyDone = false;
+
+ readItems++;
+ }
+ }
+
+ readSize = -1;
+ readItems = 0;
+ mapCur = null;
+
+ M map0 = (M)map;
+
+ map = null;
+
+ return map0;
+ }
+
+ /**
+ * @param arr Array.
+ * @param off Offset.
+ * @param len Length.
+ * @param bytes Length in bytes.
+ * @return Whether array was fully written.
+ */
+ private boolean writeArray(Object arr, long off, int len, int bytes) {
+ assert arr != null;
+ assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
+ assert off > 0;
+ assert len >= 0;
+ assert bytes >= 0;
+ assert bytes >= arrOff;
+
+ if (arrOff == -1) {
+ writeInt(len);
+
+ if (!lastFinished)
+ return false;
+
+ arrOff = 0;
+ }
+
+ int toWrite = bytes - arrOff;
+ int pos = buf.position();
+ int remaining = buf.remaining();
+
+ if (toWrite <= remaining) {
+ if (toWrite > 0) {
+ GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
+
+ buf.position(pos + toWrite);
+ }
+
+ arrOff = -1;
+
+ return true;
+ }
+ else {
+ if (remaining > 0) {
+ GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
+
+ buf.position(pos + remaining);
+
+ arrOff += remaining;
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * @param creator Array creator.
+ * @param lenShift Array length shift size.
+ * @param off Base offset.
+ * @return Array or special value if it was not fully read.
+ */
+ @SuppressWarnings("unchecked")
+ private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) {
+ assert creator != null;
+
+ if (tmpArr == null) {
+ int len = readInt();
+
+ if (!lastFinished)
+ return null;
+
+ switch (len) {
+ case -1:
+ lastFinished = true;
+
+ return null;
+
+ case 0:
+ lastFinished = true;
+
+ return creator.create(0);
+
+ default:
+ tmpArr = creator.create(len);
+ tmpArrBytes = len << lenShift;
+ }
+ }
+
+ int toRead = tmpArrBytes - tmpArrOff;
+ int remaining = buf.remaining();
+ int pos = buf.position();
+
+ lastFinished = toRead <= remaining;
+
+ if (lastFinished) {
+ GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
+
+ buf.position(pos + toRead);
+
+ T arr = (T)tmpArr;
+
+ tmpArr = null;
+ tmpArrBytes = 0;
+ tmpArrOff = 0;
+
+ return arr;
+ }
+ else {
+ GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
+
+ buf.position(pos + remaining);
+
+ tmpArrOff += remaining;
+
+ return null;
+ }
+ }
+
+ /**
+ * @param type Type.
+ * @param val Value.
+ * @param writer Writer.
+ */
+ private void write(MessageCollectionItemType type, Object val, MessageWriter writer) {
+ switch (type) {
+ case BYTE:
+ writeByte((Byte)val);
+
+ break;
+
+ case SHORT:
+ writeShort((Short)val);
+
+ break;
+
+ case INT:
+ writeInt((Integer)val);
+
+ break;
+
+ case LONG:
+ writeLong((Long)val);
+
+ break;
+
+ case FLOAT:
+ writeFloat((Float)val);
+
+ break;
+
+ case DOUBLE:
+ writeDouble((Double)val);
+
+ break;
+
+ case CHAR:
+ writeChar((Character)val);
+
+ break;
+
+ case BOOLEAN:
+ writeBoolean((Boolean)val);
+
+ break;
+
+ case BYTE_ARR:
+ writeByteArray((byte[])val);
+
+ break;
+
+ case SHORT_ARR:
+ writeShortArray((short[])val);
+
+ break;
+
+ case INT_ARR:
+ writeIntArray((int[])val);
+
+ break;
+
+ case LONG_ARR:
+ writeLongArray((long[])val);
+
+ break;
+
+ case FLOAT_ARR:
+ writeFloatArray((float[])val);
+
+ break;
+
+ case DOUBLE_ARR:
+ writeDoubleArray((double[])val);
+
+ break;
+
+ case CHAR_ARR:
+ writeCharArray((char[])val);
+
+ break;
+
+ case BOOLEAN_ARR:
+ writeBooleanArray((boolean[])val);
+
+ break;
+
+ case STRING:
+ writeString((String)val);
+
+ break;
+
+ case BIT_SET:
+ writeBitSet((BitSet)val);
+
+ break;
+
+ case UUID:
+ writeUuid((UUID)val);
+
+ break;
+
+ case IGNITE_UUID:
+ writeIgniteUuid((IgniteUuid)val);
+
+ break;
+
+ case MSG:
+ try {
+ if (val != null)
+ writer.beforeInnerMessageWrite();
+
+ writeMessage((Message)val, writer);
+ }
+ finally {
+ if (val != null)
+ writer.afterInnerMessageWrite(lastFinished);
+ }
+
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + type);
+ }
+ }
+
+ /**
+ * @param type Type.
+ * @param reader Reader.
+ * @return Value.
+ */
+ private Object read(MessageCollectionItemType type, MessageReader reader) {
+ switch (type) {
+ case BYTE:
+ return readByte();
+
+ case SHORT:
+ return readShort();
+
+ case INT:
+ return readInt();
+
+ case LONG:
+ return readLong();
+
+ case FLOAT:
+ return readFloat();
+
+ case DOUBLE:
+ return readDouble();
+
+ case CHAR:
+ return readChar();
+
+ case BOOLEAN:
+ return readBoolean();
+
+ case BYTE_ARR:
+ return readByteArray();
+
+ case SHORT_ARR:
+ return readShortArray();
+
+ case INT_ARR:
+ return readIntArray();
+
+ case LONG_ARR:
+ return readLongArray();
+
+ case FLOAT_ARR:
+ return readFloatArray();
+
+ case DOUBLE_ARR:
+ return readDoubleArray();
+
+ case CHAR_ARR:
+ return readCharArray();
+
+ case BOOLEAN_ARR:
+ return readBooleanArray();
+
+ case STRING:
+ return readString();
+
+ case BIT_SET:
+ return readBitSet();
+
+ case UUID:
+ return readUuid();
+
+ case IGNITE_UUID:
+ return readIgniteUuid();
+
+ case MSG:
+ return readMessage(reader);
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + type);
+ }
+ }
+
+ /**
+ * Array creator.
+ */
+ private interface ArrayCreator<T> {
+ /**
+ * @param len Array length or {@code -1} if array was not fully read.
+ * @return New array.
+ */
+ public T create(int len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 42f8dae..d4e2a43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -111,7 +111,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
public static final String DIRECT_PROTO_VER_ATTR = "comm.direct.proto.ver";
/** Direct protocol version. */
- public static final byte DIRECT_PROTO_VER = 2;
+ public static final byte DIRECT_PROTO_VER = 3;
/** Listeners by topic. */
private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 845e204..3a7bc8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -103,9 +103,6 @@ import static org.jsr166.ConcurrentLinkedDeque8.Node;
* Cache eviction manager.
*/
public class GridCacheEvictionManager extends GridCacheManagerAdapter {
- /** Unsafe instance. */
- private static final sun.misc.Unsafe unsafe = GridUnsafe.unsafe();
-
/** Attribute name used to queue node in entry metadata. */
private static final int META_KEY = GridMetadataAwareAdapter.EntryKey.CACHE_EVICTION_MANAGER_KEY.key();
@@ -985,7 +982,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
continue;
// Lock entry.
- unsafe.monitorEnter(entry);
+ GridUnsafe.monitorEnter(entry);
locked.add(entry);
@@ -1028,7 +1025,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
for (ListIterator<GridCacheEntryEx> it = locked.listIterator(locked.size()); it.hasPrevious();) {
GridCacheEntryEx e = it.previous();
- unsafe.monitorExit(e);
+ GridUnsafe.monitorExit(e);
}
// Remove entries and fire events outside the locks.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java
index ea036af..82e115c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import sun.misc.Unsafe;
/**
* GridCacheSwapEntry over offheap pointer.
@@ -41,9 +40,6 @@ import sun.misc.Unsafe;
*/
public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
/** */
- private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
- /** */
private final long ptr;
/** */
@@ -70,17 +66,17 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
long readPtr = ptr + GridCacheSwapEntryImpl.VERSION_OFFSET;
- boolean verEx = UNSAFE.getByte(readPtr++) != 0;
+ boolean verEx = GridUnsafe.getByte(readPtr++) != 0;
ver = U.readVersion(readPtr, verEx);
readPtr += verEx ? GridCacheSwapEntryImpl.VERSION_EX_SIZE : GridCacheSwapEntryImpl.VERSION_SIZE;
- type = UNSAFE.getByte(readPtr + 4);
+ type = GridUnsafe.getByte(readPtr + 4);
valPtr = readPtr;
- assert (ptr + size) > (UNSAFE.getInt(valPtr) + valPtr + 5);
+ assert (ptr + size) > (GridUnsafe.getInt(valPtr) + valPtr + 5);
}
/**
@@ -94,11 +90,11 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
ptr += GridCacheSwapEntryImpl.VERSION_OFFSET; // Skip ttl, expire time.
- boolean verEx = UNSAFE.getByte(ptr++) != 0;
+ boolean verEx = GridUnsafe.getByte(ptr++) != 0;
ptr += verEx ? GridCacheSwapEntryImpl.VERSION_EX_SIZE : GridCacheSwapEntryImpl.VERSION_SIZE;
- assert (ptr + size) > (UNSAFE.getInt(ptr) + ptr + 5);
+ assert (ptr + size) > (GridUnsafe.getInt(ptr) + ptr + 5);
return ptr;
}
@@ -108,7 +104,7 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
* @return TTL.
*/
public static long timeToLive(long ptr) {
- return UNSAFE.getLong(ptr);
+ return GridUnsafe.getLong(ptr);
}
/**
@@ -116,7 +112,7 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
* @return Expire time.
*/
public static long expireTime(long ptr) {
- return UNSAFE.getLong(ptr + GridCacheSwapEntryImpl.EXPIRE_TIME_OFFSET);
+ return GridUnsafe.getLong(ptr + GridCacheSwapEntryImpl.EXPIRE_TIME_OFFSET);
}
/**
@@ -126,7 +122,7 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
public static GridCacheVersion version(long ptr) {
long addr = ptr + GridCacheSwapEntryImpl.VERSION_OFFSET;
- boolean verEx = UNSAFE.getByte(addr) != 0;
+ boolean verEx = GridUnsafe.getByte(addr) != 0;
addr++;
@@ -165,12 +161,12 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry {
/** {@inheritDoc} */
@Override public long ttl() {
- return UNSAFE.getLong(ptr);
+ return GridUnsafe.getLong(ptr);
}
/** {@inheritDoc} */
@Override public long expireTime() {
- return UNSAFE.getLong(ptr + GridCacheSwapEntryImpl.EXPIRE_TIME_OFFSET);
+ return GridUnsafe.getLong(ptr + GridCacheSwapEntryImpl.EXPIRE_TIME_OFFSET);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
index 6b1266f..4a69822 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
@@ -26,19 +26,12 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import sun.misc.Unsafe;
/**
* Swap entry.
*/
public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
/** */
- private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
- /** */
- private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
- /** */
static final int EXPIRE_TIME_OFFSET = 8;
/** */
@@ -108,7 +101,7 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
* @return TTL.
*/
public static long timeToLive(byte[] bytes) {
- return UNSAFE.getLong(bytes, BYTE_ARR_OFF);
+ return GridUnsafe.getLongAligned(bytes, GridUnsafe.BYTE_ARR_OFF);
}
/**
@@ -116,7 +109,7 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
* @return Expire time.
*/
public static long expireTime(byte[] bytes) {
- return UNSAFE.getLong(bytes, BYTE_ARR_OFF + EXPIRE_TIME_OFFSET);
+ return GridUnsafe.getLongAligned(bytes, GridUnsafe.BYTE_ARR_OFF + EXPIRE_TIME_OFFSET);
}
/**
@@ -124,9 +117,9 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
* @return Version.
*/
public static GridCacheVersion version(byte[] bytes) {
- long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+ long off = GridUnsafe.BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
- boolean verEx = UNSAFE.getByte(bytes, off++) != 0;
+ boolean verEx = GridUnsafe.getByte(bytes, off++) != 0;
return U.readVersion(bytes, off, verEx);
}
@@ -136,21 +129,21 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
* @return Value if value is byte array, otherwise {@code null}.
*/
@Nullable public static IgniteBiTuple<byte[], Byte> getValue(byte[] bytes) {
- long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+ long off = GridUnsafe.BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
- boolean verEx = UNSAFE.getByte(bytes, off++) != 0;
+ boolean verEx = GridUnsafe.getByte(bytes, off++) != 0;
off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
- int arrLen = UNSAFE.getInt(bytes, off);
+ int arrLen = GridUnsafe.getIntAligned(bytes, off);
off += 4;
- byte type = UNSAFE.getByte(bytes, off++);
+ byte type = GridUnsafe.getByte(bytes, off++);
byte[] valBytes = new byte[arrLen];
- UNSAFE.copyMemory(bytes, off, valBytes, BYTE_ARR_OFF, arrLen);
+ GridUnsafe.copyMemory(bytes, off, valBytes, GridUnsafe.BYTE_ARR_OFF, arrLen);
return new IgniteBiTuple<>(valBytes, type);
}
@@ -235,25 +228,25 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
byte[] arr = new byte[size];
- long off = BYTE_ARR_OFF;
+ long off = GridUnsafe.BYTE_ARR_OFF;
- UNSAFE.putLong(arr, off, ttl);
+ GridUnsafe.putLongAligned(arr, off, ttl);
off += 8;
- UNSAFE.putLong(arr, off, expireTime);
+ GridUnsafe.putLongAligned(arr, off, expireTime);
off += 8;
off = U.writeVersion(arr, off, ver);
- UNSAFE.putInt(arr, off, len);
+ GridUnsafe.putIntAligned(arr, off, len);
off += 4;
- UNSAFE.putByte(arr, off++, type);
+ GridUnsafe.putByte(arr, off++, type);
- UNSAFE.copyMemory(valBytes.array(), BYTE_ARR_OFF, arr, off, len);
+ GridUnsafe.copyMemory(valBytes.array(), GridUnsafe.BYTE_ARR_OFF, arr, off, len);
off += len;
@@ -271,21 +264,21 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
*/
public static GridCacheSwapEntryImpl unmarshal(byte[] arr, boolean valOnly) {
if (valOnly) {
- long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+ long off = GridUnsafe.BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
- boolean verEx = UNSAFE.getByte(arr, off++) != 0;
+ boolean verEx = GridUnsafe.getByte(arr, off++) != 0;
off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
- int arrLen = UNSAFE.getInt(arr, off);
+ int arrLen = GridUnsafe.getIntAligned(arr, off);
off += 4;
- byte type = UNSAFE.getByte(arr, off++);
+ byte type = GridUnsafe.getByte(arr, off++);
byte[] valBytes = new byte[arrLen];
- UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen);
+ GridUnsafe.copyMemory(arr, off, valBytes, GridUnsafe.BYTE_ARR_OFF, arrLen);
return new GridCacheSwapEntryImpl(ByteBuffer.wrap(valBytes),
type,
@@ -296,31 +289,31 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
null);
}
- long off = BYTE_ARR_OFF;
+ long off = GridUnsafe.BYTE_ARR_OFF;
- long ttl = UNSAFE.getLong(arr, off);
+ long ttl = GridUnsafe.getLongAligned(arr, off);
off += 8;
- long expireTime = UNSAFE.getLong(arr, off);
+ long expireTime = GridUnsafe.getLongAligned(arr, off);
off += 8;
- boolean verEx = UNSAFE.getBoolean(arr, off++);
+ boolean verEx = GridUnsafe.getBoolean(arr, off++);
GridCacheVersion ver = U.readVersion(arr, off, verEx);
off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
- int arrLen = UNSAFE.getInt(arr, off);
+ int arrLen = GridUnsafe.getIntAligned(arr, off);
off += 4;
- byte type = UNSAFE.getByte(arr, off++);
+ byte type = GridUnsafe.getByte(arr, off++);
byte[] valBytes = new byte[arrLen];
- UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen);
+ GridUnsafe.copyMemory(arr, off, valBytes, GridUnsafe.BYTE_ARR_OFF, arrLen);
off += arrLen;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index c9d6dad..9d95c2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -48,7 +48,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryMetadata;
@@ -56,13 +56,14 @@ import org.apache.ignite.internal.binary.BinaryMetadataHandler;
import org.apache.ignite.internal.binary.BinaryObjectEx;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl;
+import org.apache.ignite.internal.binary.BinaryPrimitives;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
-import org.apache.ignite.internal.binary.GridBinaryMarshaller;
-import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter;
@@ -95,7 +96,6 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import sun.misc.Unsafe;
/**
* Binary processor implementation.
@@ -103,9 +103,6 @@ import sun.misc.Unsafe;
public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorImpl implements
CacheObjectBinaryProcessor {
/** */
- private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
- /** */
private final CountDownLatch startLatch = new CountDownLatch(1);
/** */
@@ -356,11 +353,11 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
public Object unmarshal(long ptr, boolean forceHeap) throws BinaryObjectException {
assert ptr > 0 : ptr;
- int size = UNSAFE.getInt(ptr);
+ int size = GridUnsafe.getInt(ptr);
ptr += 4;
- byte type = UNSAFE.getByte(ptr++);
+ byte type = GridUnsafe.getByte(ptr++);
if (type != CacheObject.TYPE_BYTE_ARR) {
assert size > 0 : size;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 81fd5d6..966c10a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -137,9 +137,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT =
Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
- /** Unsafe instance. */
- private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe();
-
/** Update reply closure. */
private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
@@ -2364,10 +2361,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridDhtCacheEntry entry = entryExx(key, topVer);
- UNSAFE.monitorEnter(entry);
+ GridUnsafe.monitorEnter(entry);
if (entry.obsolete())
- UNSAFE.monitorExit(entry);
+ GridUnsafe.monitorExit(entry);
else
return Collections.singletonList(entry);
}
@@ -2407,13 +2404,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (entry == null)
continue;
- UNSAFE.monitorEnter(entry);
+ GridUnsafe.monitorEnter(entry);
if (entry.obsolete()) {
// Unlock all locked.
for (int j = 0; j <= i; j++) {
if (locked.get(j) != null)
- UNSAFE.monitorExit(locked.get(j));
+ GridUnsafe.monitorExit(locked.get(j));
}
// Clear entries.
@@ -2462,7 +2459,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// That's why releasing locks in the finally block..
for (GridCacheMapEntry entry : locked) {
if (entry != null)
- UNSAFE.monitorExit(entry);
+ GridUnsafe.monitorExit(entry);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 6130ead..c4877f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -89,9 +89,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
/** */
private static final long serialVersionUID = 0L;
- /** Unsafe instance. */
- private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe();
-
/** */
private GridCachePreloader preldr;
@@ -1506,12 +1503,12 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
for (int i = 0; i < locked.size(); i++) {
GridCacheEntryEx entry = locked.get(i);
- UNSAFE.monitorEnter(entry);
+ GridUnsafe.monitorEnter(entry);
if (entry.obsolete()) {
// Unlock all locked.
for (int j = 0; j <= i; j++)
- UNSAFE.monitorExit(locked.get(j));
+ GridUnsafe.monitorExit(locked.get(j));
// Clear entries.
locked.clear();
@@ -1542,7 +1539,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
*/
private void unlockEntries(Iterable<GridCacheEntryEx> locked) {
for (GridCacheEntryEx entry : locked)
- UNSAFE.monitorExit(entry);
+ GridUnsafe.monitorExit(entry);
AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 5b764b6..54dd69e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -51,9 +51,6 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
*
*/
public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter implements IgniteCacheObjectProcessor {
- /** */
- private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe();
-
/** Immutable classes. */
private static final Collection<Class<?>> IMMUTABLE_CLS = new HashSet<>();
@@ -138,9 +135,9 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
{
assert valPtr != 0;
- int size = UNSAFE.getInt(valPtr);
+ int size = GridUnsafe.getInt(valPtr);
- byte type = UNSAFE.getByte(valPtr + 4);
+ byte type = GridUnsafe.getByte(valPtr + 4);
byte[] bytes = U.copyMemory(valPtr + 5, size);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2572a545/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
index 2f6ad5c..b16f42f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.processors.platform.memory;
-import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.UNSAFE;
+import org.apache.ignite.internal.util.GridUnsafe;
/**
* Interop output stream implementation working with BIG ENDIAN architecture.
@@ -46,7 +46,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
long startPos = data + pos;
for (short item : val) {
- UNSAFE.putShort(startPos, Short.reverseBytes(item));
+ GridUnsafe.putShort(startPos, Short.reverseBytes(item));
startPos += 2;
}
@@ -68,7 +68,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
long startPos = data + pos;
for (char item : val) {
- UNSAFE.putChar(startPos, Character.reverseBytes(item));
+ GridUnsafe.putChar(startPos, Character.reverseBytes(item));
startPos += 2;
}
@@ -90,7 +90,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
long startPos = data + pos;
for (int item : val) {
- UNSAFE.putInt(startPos, Integer.reverseBytes(item));
+ GridUnsafe.putInt(startPos, Integer.reverseBytes(item));
startPos += 4;
}
@@ -117,7 +117,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
long startPos = data + pos;
for (float item : val) {
- UNSAFE.putInt(startPos, Integer.reverseBytes(Float.floatToIntBits(item)));
+ GridUnsafe.putInt(startPos, Integer.reverseBytes(Float.floatToIntBits(item)));
startPos += 4;
}
@@ -139,7 +139,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
long startPos = data + pos;
for (long item : val) {
- UNSAFE.putLong(startPos, Long.reverseBytes(item));
+ GridUnsafe.putLong(startPos, Long.reverseBytes(item));
startPos += 8;
}
@@ -156,7 +156,7 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
long startPos = data + pos;
for (double item : val) {
- UNSAFE.putLong(startPos, Long.reverseBytes(Double.doubleToLongBits(item)));
+ GridUnsafe.putLong(startPos, Long.reverseBytes(Double.doubleToLongBits(item)));
startPos += 8;
}