You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/14 15:10:20 UTC
[05/55] [abbrv] ignite git commit: ignite-2065: rename "portable"
classes to "binary"
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java
deleted file mode 100644
index 6c5ddfe..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.binary.builder;
-
-import org.apache.ignite.internal.binary.BinaryWriterExImpl;
-import org.apache.ignite.internal.binary.BinaryWriterExImpl;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- *
- */
-class PortableValueWithType implements PortableLazyValue {
- /** */
- private byte type;
-
- /** */
- private Object val;
-
- /**
- * @param type Type
- * @param val Value.
- */
- PortableValueWithType(byte type, Object val) {
- this.type = type;
- this.val = val;
- }
-
- /** {@inheritDoc} */
- @Override public void writeTo(BinaryWriterExImpl writer, PortableBuilderSerializer ctx) {
- if (val instanceof PortableBuilderSerializationAware)
- ((PortableBuilderSerializationAware)val).writeTo(writer, ctx);
- else
- ctx.writeValue(writer, val);
- }
-
- /**
- * @return Type ID.
- */
- public int typeId() {
- return type;
- }
-
- /** {@inheritDoc} */
- @Override public Object value() {
- if (val instanceof PortableLazyValue)
- return ((PortableLazyValue)val).value();
-
- return val;
- }
-
- /**
- * @param val New value.
- */
- public void value(Object val) {
- this.val = val;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(PortableValueWithType.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java
new file mode 100644
index 0000000..e3be794
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+import org.apache.ignite.binary.BinaryObjectException;
+
+/**
+ * Portable abstract input stream.
+ */
+public abstract class BinaryAbstractInputStream extends BinaryAbstractStream
+ implements BinaryInputStream {
+ /** Length of data inside array. */
+ protected int len;
+
+ /** {@inheritDoc} */
+ @Override public byte readByte() {
+ ensureEnoughData(1);
+
+ return readByteAndShift();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] readByteArray(int cnt) {
+ ensureEnoughData(cnt);
+
+ byte[] res = new byte[cnt];
+
+ copyAndShift(res, BYTE_ARR_OFF, cnt);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readBoolean() {
+ return readByte() == BYTE_ONE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean[] readBooleanArray(int cnt) {
+ ensureEnoughData(cnt);
+
+ boolean[] res = new boolean[cnt];
+
+ copyAndShift(res, BOOLEAN_ARR_OFF, cnt);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short readShort() {
+ ensureEnoughData(2);
+
+ short res = readShortFast();
+
+ shift(2);
+
+ if (!LITTLE_ENDIAN)
+ res = Short.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short[] readShortArray(int cnt) {
+ int len = cnt << 1;
+
+ ensureEnoughData(len);
+
+ short[] res = new short[cnt];
+
+ copyAndShift(res, SHORT_ARR_OFF, len);
+
+ if (!LITTLE_ENDIAN) {
+ for (int i = 0; i < res.length; i++)
+ res[i] = Short.reverseBytes(res[i]);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char readChar() {
+ ensureEnoughData(2);
+
+ char res = readCharFast();
+
+ shift(2);
+
+ if (!LITTLE_ENDIAN)
+ res = Character.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char[] readCharArray(int cnt) {
+ int len = cnt << 1;
+
+ ensureEnoughData(len);
+
+ char[] res = new char[cnt];
+
+ copyAndShift(res, CHAR_ARR_OFF, len);
+
+ if (!LITTLE_ENDIAN) {
+ for (int i = 0; i < res.length; i++)
+ res[i] = Character.reverseBytes(res[i]);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt() {
+ ensureEnoughData(4);
+
+ int res = readIntFast();
+
+ shift(4);
+
+ if (!LITTLE_ENDIAN)
+ res = Integer.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] readIntArray(int cnt) {
+ int len = cnt << 2;
+
+ ensureEnoughData(len);
+
+ int[] res = new int[cnt];
+
+ copyAndShift(res, INT_ARR_OFF, len);
+
+ if (!LITTLE_ENDIAN) {
+ for (int i = 0; i < res.length; i++)
+ res[i] = Integer.reverseBytes(res[i]);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte readBytePositioned(int pos) {
+ int delta = pos + 1 - this.pos;
+
+ if (delta > 0)
+ ensureEnoughData(delta);
+
+ return readBytePositioned0(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short readShortPositioned(int pos) {
+ int delta = pos + 2 - this.pos;
+
+ if (delta > 0)
+ ensureEnoughData(delta);
+
+ return readShortPositioned0(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readIntPositioned(int pos) {
+ int delta = pos + 4 - this.pos;
+
+ if (delta > 0)
+ ensureEnoughData(delta);
+
+ return readIntPositioned0(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public float readFloat() {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /** {@inheritDoc} */
+ @Override public float[] readFloatArray(int cnt) {
+ int len = cnt << 2;
+
+ ensureEnoughData(len);
+
+ float[] res = new float[cnt];
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(res, FLOAT_ARR_OFF, len);
+ else {
+ for (int i = 0; i < res.length; i++) {
+ int x = readIntFast();
+
+ shift(4);
+
+ res[i] = Float.intBitsToFloat(Integer.reverseBytes(x));
+ }
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLong() {
+ ensureEnoughData(8);
+
+ long res = readLongFast();
+
+ shift(8);
+
+ if (!LITTLE_ENDIAN)
+ res = Long.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long[] readLongArray(int cnt) {
+ int len = cnt << 3;
+
+ ensureEnoughData(len);
+
+ long[] res = new long[cnt];
+
+ copyAndShift(res, LONG_ARR_OFF, len);
+
+ if (!LITTLE_ENDIAN) {
+ for (int i = 0; i < res.length; i++)
+ res[i] = Long.reverseBytes(res[i]);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public double readDouble() {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ /** {@inheritDoc} */
+ @Override public double[] readDoubleArray(int cnt) {
+ int len = cnt << 3;
+
+ ensureEnoughData(len);
+
+ double[] res = new double[cnt];
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(res, DOUBLE_ARR_OFF, len);
+ else {
+ for (int i = 0; i < res.length; i++) {
+ long x = readLongFast();
+
+ shift(8);
+
+ res[i] = Double.longBitsToDouble(Long.reverseBytes(x));
+ }
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(byte[] arr, int off, int len) {
+ if (len > remaining())
+ len = remaining();
+
+ copyAndShift(arr, BYTE_ARR_OFF + off, len);
+
+ return len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void position(int pos) {
+ if (remaining() + this.pos < pos)
+ throw new BinaryObjectException("Position is out of bounds: " + pos);
+ else
+ this.pos = pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long offheapPointer() {
+ return 0;
+ }
+
+ /**
+ * Ensure that there is enough data.
+ *
+ * @param cnt Length.
+ */
+ protected void ensureEnoughData(int cnt) {
+ if (remaining() < cnt)
+ throw new BinaryObjectException("Not enough data to read the value [position=" + pos +
+ ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']');
+ }
+
+ /**
+ * Read next byte from the stream and perform shift.
+ *
+ * @return Next byte.
+ */
+ protected abstract byte readByteAndShift();
+
+ /**
+ * Copy data to target object shift position afterwards.
+ *
+ * @param target Target.
+ * @param off Offset.
+ * @param len Length.
+ */
+ protected abstract void copyAndShift(Object target, long off, int len);
+
+ /**
+ * Read short value (fast path).
+ *
+ * @return Short value.
+ */
+ protected abstract short readShortFast();
+
+ /**
+ * Read char value (fast path).
+ *
+ * @return Char value.
+ */
+ protected abstract char readCharFast();
+
+ /**
+ * Read int value (fast path).
+ *
+ * @return Int value.
+ */
+ protected abstract int readIntFast();
+
+ /**
+ * Read long value (fast path).
+ *
+ * @return Long value.
+ */
+ protected abstract long readLongFast();
+
+ /**
+ * Internal routine for positioned byte value read.
+ *
+ * @param pos Position.
+ * @return Int value.
+ */
+ protected abstract byte readBytePositioned0(int pos);
+
+ /**
+ * Internal routine for positioned short value read.
+ *
+ * @param pos Position.
+ * @return Int value.
+ */
+ protected abstract short readShortPositioned0(int pos);
+
+ /**
+ * Internal routine for positioned int value read.
+ *
+ * @param pos Position.
+ * @return Int value.
+ */
+ protected abstract int readIntPositioned0(int pos);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
new file mode 100644
index 0000000..199ee71
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+/**
+ * Base portable output stream.
+ */
+public abstract class BinaryAbstractOutputStream extends BinaryAbstractStream
+ implements BinaryOutputStream {
+ /** Minimal capacity when it is reasonable to start doubling resize. */
+ private static final int MIN_CAP = 256;
+
+ /** {@inheritDoc} */
+ @Override public void writeByte(byte val) {
+ ensureCapacity(pos + 1);
+
+ writeByteAndShift(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByteArray(byte[] val) {
+ ensureCapacity(pos + val.length);
+
+ copyAndShift(val, BYTE_ARR_OFF, val.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBoolean(boolean val) {
+ writeByte(val ? BYTE_ONE : BYTE_ZERO);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBooleanArray(boolean[] val) {
+ ensureCapacity(pos + val.length);
+
+ copyAndShift(val, BOOLEAN_ARR_OFF, val.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShort(short val) {
+ ensureCapacity(pos + 2);
+
+ if (!LITTLE_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ writeShortFast(val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShortArray(short[] val) {
+ int cnt = val.length << 1;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, SHORT_ARR_OFF, cnt);
+ else {
+ for (short item : val)
+ writeShortFast(Short.reverseBytes(item));
+
+ shift(cnt);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeChar(char val) {
+ ensureCapacity(pos + 2);
+
+ if (!LITTLE_ENDIAN)
+ val = Character.reverseBytes(val);
+
+ writeCharFast(val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeCharArray(char[] val) {
+ int cnt = val.length << 1;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, CHAR_ARR_OFF, cnt);
+ else {
+ for (char item : val)
+ writeCharFast(Character.reverseBytes(item));
+
+ shift(cnt);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int val) {
+ ensureCapacity(pos + 4);
+
+ if (!LITTLE_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ writeIntFast(val);
+
+ shift(4);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShort(int pos, short val) {
+ ensureCapacity(pos + 2);
+
+ unsafeWriteShort(pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int pos, int val) {
+ ensureCapacity(pos + 4);
+
+ unsafeWriteInt(pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeIntArray(int[] val) {
+ int cnt = val.length << 2;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, INT_ARR_OFF, cnt);
+ else {
+ for (int item : val)
+ writeIntFast(Integer.reverseBytes(item));
+
+ shift(cnt);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloat(float val) {
+ writeInt(Float.floatToIntBits(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloatArray(float[] val) {
+ int cnt = val.length << 2;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, FLOAT_ARR_OFF, cnt);
+ else {
+ for (float item : val) {
+ writeIntFast(Integer.reverseBytes(Float.floatToIntBits(item)));
+
+ shift(4);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLong(long val) {
+ ensureCapacity(pos + 8);
+
+ if (!LITTLE_ENDIAN)
+ val = Long.reverseBytes(val);
+
+ writeLongFast(val);
+
+ shift(8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLongArray(long[] val) {
+ int cnt = val.length << 3;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, LONG_ARR_OFF, cnt);
+ else {
+ for (long item : val)
+ writeLongFast(Long.reverseBytes(item));
+
+ shift(cnt);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDouble(double val) {
+ writeLong(Double.doubleToLongBits(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDoubleArray(double[] val) {
+ int cnt = val.length << 3;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, DOUBLE_ARR_OFF, cnt);
+ else {
+ for (double item : val) {
+ writeLongFast(Long.reverseBytes(Double.doubleToLongBits(item)));
+
+ shift(8);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] arr, int off, int len) {
+ ensureCapacity(pos + len);
+
+ copyAndShift(arr, BYTE_ARR_OFF + off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(long addr, int cnt) {
+ ensureCapacity(pos + cnt);
+
+ copyAndShift(null, addr, cnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void position(int pos) {
+ ensureCapacity(pos);
+
+ unsafePosition(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long offheapPointer() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeEnsure(int cap) {
+ ensureCapacity(pos + cap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafePosition(int pos) {
+ this.pos = pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteBoolean(boolean val) {
+ unsafeWriteByte(val ? BYTE_ONE : BYTE_ZERO);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteFloat(float val) {
+ unsafeWriteInt(Float.floatToIntBits(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteDouble(double val) {
+ unsafeWriteLong(Double.doubleToLongBits(val));
+ }
+
+ /**
+ * Calculate new capacity.
+ *
+ * @param curCap Current capacity.
+ * @param reqCap Required capacity.
+ * @return New capacity.
+ */
+ protected static int capacity(int curCap, int reqCap) {
+ int newCap;
+
+ if (reqCap < MIN_CAP)
+ newCap = MIN_CAP;
+ else {
+ newCap = curCap << 1;
+
+ if (newCap < reqCap)
+ newCap = reqCap;
+ }
+
+ return newCap;
+ }
+
+ /**
+ * Write next byte to the stream.
+ *
+ * @param val Value.
+ */
+ protected abstract void writeByteAndShift(byte val);
+
+ /**
+ * Copy source object to the stream shift position afterwards.
+ *
+ * @param src Source.
+ * @param off Offset.
+ * @param len Length.
+ */
+ protected abstract void copyAndShift(Object src, long off, int len);
+
+ /**
+ * Write short value (fast path).
+ *
+ * @param val Short value.
+ */
+ protected abstract void writeShortFast(short val);
+
+ /**
+ * Write char value (fast path).
+ *
+ * @param val Char value.
+ */
+ protected abstract void writeCharFast(char val);
+
+ /**
+ * Write int value (fast path).
+ *
+ * @param val Int value.
+ */
+ protected abstract void writeIntFast(int val);
+
+ /**
+ * Write long value (fast path).
+ *
+ * @param val Long value.
+ */
+ protected abstract void writeLongFast(long val);
+
+ /**
+ * Ensure capacity.
+ *
+ * @param cnt Required byte count.
+ */
+ protected abstract void ensureCapacity(int cnt);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractStream.java
new file mode 100644
index 0000000..ce57631
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractStream.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.util.GridUnsafe;
+import sun.misc.Unsafe;
+
+/**
+ * Portable abstract stream.
+ */
+public abstract class BinaryAbstractStream implements BinaryStream {
+ /** Byte: zero. */
+ protected static final byte BYTE_ZERO = 0;
+
+ /** Byte: one. */
+ protected static final byte BYTE_ONE = 1;
+
+ /** Whether little endian is used on the platform. */
+ protected static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
+
+ /** Unsafe instance. */
+ protected static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** Array offset: boolean. */
+ protected static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
+
+ /** Array offset: byte. */
+ protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** Array offset: short. */
+ protected static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
+
+ /** Array offset: char. */
+ protected static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
+
+ /** Array offset: int. */
+ protected static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
+
+ /** Array offset: float. */
+ protected static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
+
+ /** Array offset: long. */
+ protected static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
+
+ /** Array offset: double. */
+ protected static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
+
+ /** Position. */
+ protected int pos;
+
+ /** {@inheritDoc} */
+ @Override public int position() {
+ return pos;
+ }
+
+ /**
+ * Shift position.
+ *
+ * @param cnt Byte count.
+ */
+ protected void shift(int cnt) {
+ pos += cnt;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java
new file mode 100644
index 0000000..502b9dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+import java.util.Arrays;
+
+/**
+ * Portable off-heap input stream.
+ */
+public final class BinaryHeapInputStream extends BinaryAbstractInputStream {
+ /**
+ * Create stream with pointer set at the given position.
+ *
+ * @param data Data.
+ * @param pos Position.
+ * @return Stream.
+ */
+ public static BinaryHeapInputStream create(byte[] data, int pos) {
+ assert pos < data.length;
+
+ BinaryHeapInputStream stream = new BinaryHeapInputStream(data);
+
+ stream.pos = pos;
+
+ return stream;
+ }
+
+ /** Data. */
+ private byte[] data;
+
+ /**
+ * Constructor.
+ *
+ * @param data Data.
+ */
+ public BinaryHeapInputStream(byte[] data) {
+ this.data = data;
+
+ len = data.length;
+ }
+
+ /**
+ * @return Copy of this stream.
+ */
+ public BinaryHeapInputStream copy() {
+ BinaryHeapInputStream in = new BinaryHeapInputStream(Arrays.copyOf(data, data.length));
+
+ in.position(pos);
+
+ return in;
+ }
+
+ /**
+ * Method called from JNI to resize stream.
+ *
+ * @param len Required length.
+ * @return Underlying byte array.
+ */
+ public byte[] resize(int len) {
+ if (data.length < len) {
+ byte[] data0 = new byte[len];
+
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF, data0, BYTE_ARR_OFF, data.length);
+
+ data = data0;
+ }
+
+ return data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int remaining() {
+ return data.length - pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ return data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ byte[] res = new byte[len];
+
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, res.length);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected byte readByteAndShift() {
+ return data[pos++];
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void copyAndShift(Object target, long off, int len) {
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF + pos, target, off, len);
+
+ shift(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected short readShortFast() {
+ return UNSAFE.getShort(data, BYTE_ARR_OFF + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected char readCharFast() {
+ return UNSAFE.getChar(data, BYTE_ARR_OFF + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int readIntFast() {
+ return UNSAFE.getInt(data, BYTE_ARR_OFF + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long readLongFast() {
+ return UNSAFE.getLong(data, BYTE_ARR_OFF + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected byte readBytePositioned0(int pos) {
+ return UNSAFE.getByte(data, BYTE_ARR_OFF + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected short readShortPositioned0(int pos) {
+ short res = UNSAFE.getShort(data, BYTE_ARR_OFF + pos);
+
+ if (!LITTLE_ENDIAN)
+ res = Short.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int readIntPositioned0(int pos) {
+ int res = UNSAFE.getInt(data, BYTE_ARR_OFF + pos);
+
+ if (!LITTLE_ENDIAN)
+ res = Integer.reverseBytes(res);
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
new file mode 100644
index 0000000..02c3441
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+/**
+ * Portable heap output stream.
+ */
+public final class BinaryHeapOutputStream extends BinaryAbstractOutputStream {
+ /** Allocator. */
+ private final BinaryMemoryAllocatorChunk chunk;
+
+ /** Data. */
+ private byte[] data;
+
+ /**
+ * Constructor.
+ *
+ * @param cap Initial capacity.
+ */
+ public BinaryHeapOutputStream(int cap) {
+ this(cap, BinaryMemoryAllocator.INSTANCE.chunk());
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cap Capacity.
+ * @param chunk Chunk.
+ */
+ public BinaryHeapOutputStream(int cap, BinaryMemoryAllocatorChunk chunk) {
+ this.chunk = chunk;
+
+ data = chunk.allocate(cap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ chunk.release(data, pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void ensureCapacity(int cnt) {
+ if (cnt > data.length) {
+ int newCap = capacity(data.length, cnt);
+
+ data = chunk.reallocate(data, newCap);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ return data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ byte[] res = new byte[pos];
+
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, pos);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeByteAndShift(byte val) {
+ data[pos++] = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void copyAndShift(Object src, long off, int len) {
+ UNSAFE.copyMemory(src, off, data, BYTE_ARR_OFF + pos, len);
+
+ shift(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeShortFast(short val) {
+ UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeCharFast(char val) {
+ UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeIntFast(int val) {
+ UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeLongFast(long val) {
+ UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteByte(byte val) {
+ UNSAFE.putByte(data, BYTE_ARR_OFF + pos++, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteShort(short val) {
+ if (!LITTLE_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteShort(int pos, short val) {
+ if (!LITTLE_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteChar(char val) {
+ if (!LITTLE_ENDIAN)
+ val = Character.reverseBytes(val);
+
+ UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteInt(int val) {
+ if (!LITTLE_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
+
+ shift(4);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteInt(int pos, int val) {
+ if (!LITTLE_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteLong(long val) {
+ if (!LITTLE_ENDIAN)
+ val = Long.reverseBytes(val);
+
+ UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val);
+
+ shift(8);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryInputStream.java
new file mode 100644
index 0000000..63457e4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryInputStream.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+import org.apache.ignite.internal.binary.BinaryPositionReadable;
+
+/**
+ * Portable input stream.
+ */
+public interface BinaryInputStream extends BinaryStream, BinaryPositionReadable {
+ /**
+ * Read byte value.
+ *
+ * @return Byte value.
+ */
+ public byte readByte();
+
+ /**
+ * Read byte array.
+ *
+ * @param cnt Expected item count.
+ * @return Byte array.
+ */
+ public byte[] readByteArray(int cnt);
+
+ /**
+ * Reads {@code cnt} of bytes into byte array.
+ *
+ * @param arr Expected item count.
+ * @param off offset
+ * @param cnt number of bytes to read.
+ * @return actual length read.
+ */
+ public int read(byte[] arr, int off, int cnt);
+
+ /**
+ * Read boolean value.
+ *
+ * @return Boolean value.
+ */
+ public boolean readBoolean();
+
+ /**
+ * Read boolean array.
+ *
+ * @param cnt Expected item count.
+ * @return Boolean array.
+ */
+ public boolean[] readBooleanArray(int cnt);
+
+ /**
+ * Read short value.
+ *
+ * @return Short value.
+ */
+ public short readShort();
+
+ /**
+ * Read short array.
+ *
+ * @param cnt Expected item count.
+ * @return Short array.
+ */
+ public short[] readShortArray(int cnt);
+
+ /**
+ * Read char value.
+ *
+ * @return Char value.
+ */
+ public char readChar();
+
+ /**
+ * Read char array.
+ *
+ * @param cnt Expected item count.
+ * @return Char array.
+ */
+ public char[] readCharArray(int cnt);
+
+ /**
+ * Read int value.
+ *
+ * @return Int value.
+ */
+ public int readInt();
+
+ /**
+ * Read int array.
+ *
+ * @param cnt Expected item count.
+ * @return Int array.
+ */
+ public int[] readIntArray(int cnt);
+
+ /**
+ * Read float value.
+ *
+ * @return Float value.
+ */
+ public float readFloat();
+
+ /**
+ * Read float array.
+ *
+ * @param cnt Expected item count.
+ * @return Float array.
+ */
+ public float[] readFloatArray(int cnt);
+
+ /**
+ * Read long value.
+ *
+ * @return Long value.
+ */
+ public long readLong();
+
+ /**
+ * Read long array.
+ *
+ * @param cnt Expected item count.
+ * @return Long array.
+ */
+ public long[] readLongArray(int cnt);
+
+ /**
+ * Read double value.
+ *
+ * @return Double value.
+ */
+ public double readDouble();
+
+ /**
+ * Read double array.
+ *
+ * @param cnt Expected item count.
+ * @return Double array.
+ */
+ public double[] readDoubleArray(int cnt);
+
+ /**
+ * Gets amount of remaining data in bytes.
+ *
+ * @return Remaining data.
+ */
+ public int remaining();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
new file mode 100644
index 0000000..5471bc5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+/**
+ * Thread-local memory allocator.
+ */
+public final class BinaryMemoryAllocator {
+ /** Memory allocator instance. */
+ public static final BinaryMemoryAllocator INSTANCE = new BinaryMemoryAllocator();
+
+ /** Holders. */
+ private static final ThreadLocal<BinaryMemoryAllocatorChunk> holders = new ThreadLocal<>();
+
+ /**
+ * Ensures singleton.
+ */
+ private BinaryMemoryAllocator() {
+ // No-op.
+ }
+
+ public BinaryMemoryAllocatorChunk chunk() {
+ BinaryMemoryAllocatorChunk holder = holders.get();
+
+ if (holder == null)
+ holders.set(holder = new BinaryMemoryAllocatorChunk());
+
+ return holder;
+ }
+
+ /**
+ * Checks whether a thread-local array is acquired or not.
+ * The function is used by Unit tests.
+ *
+ * @return {@code true} if acquired {@code false} otherwise.
+ */
+ public boolean isAcquired() {
+ BinaryMemoryAllocatorChunk holder = holders.get();
+
+ return holder != null && holder.isAcquired();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
new file mode 100644
index 0000000..7c73742
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import sun.misc.Unsafe;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
+
+/**
+ * Memory allocator chunk.
+ */
+public class BinaryMemoryAllocatorChunk {
+ /** Unsafe instance. */
+ protected static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** Array offset: byte. */
+ protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** Buffer size re-check frequency. */
+ private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, 10000);
+
+ /** Data array */
+ private byte[] data;
+
+ /** Max message size detected between checks. */
+ private int maxMsgSize;
+
+ /** Last time array size is checked. */
+ private long lastCheck = U.currentTimeMillis();
+
+ /** Whether the holder is acquired or not. */
+ private boolean acquired;
+
+ /**
+ * Allocate.
+ *
+ * @param size Desired size.
+ * @return Data.
+ */
+ public byte[] allocate(int size) {
+ if (acquired)
+ return new byte[size];
+
+ acquired = true;
+
+ if (data == null || size > data.length)
+ data = new byte[size];
+
+ return data;
+ }
+
+ /**
+ * Reallocate.
+ *
+ * @param data Old data.
+ * @param size Size.
+ * @return New data.
+ */
+ public byte[] reallocate(byte[] data, int size) {
+ byte[] newData = new byte[size];
+
+ if (this.data == data)
+ this.data = newData;
+
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
+
+ return newData;
+ }
+
+ /**
+ * Shrinks array size if needed.
+ */
+ public void release(byte[] data, int maxMsgSize) {
+ if (this.data != data)
+ return;
+
+ if (maxMsgSize > this.maxMsgSize)
+ this.maxMsgSize = maxMsgSize;
+
+ this.acquired = false;
+
+ long now = U.currentTimeMillis();
+
+ if (now - this.lastCheck >= CHECK_FREQ) {
+ int halfSize = data.length >> 1;
+
+ if (this.maxMsgSize < halfSize)
+ this.data = new byte[halfSize];
+
+ this.lastCheck = now;
+ }
+ }
+
+ /**
+ * @return {@code True} if acquired.
+ */
+ public boolean isAcquired() {
+ return acquired;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java
new file mode 100644
index 0000000..dc18c9e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+/**
+ * Portable off-heap input stream.
+ */
+public class BinaryOffheapInputStream extends BinaryAbstractInputStream {
+ /** Pointer. */
+ private final long ptr;
+
+ /** Capacity. */
+ private final int cap;
+
+ /** */
+ private boolean forceHeap;
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Pointer.
+ * @param cap Capacity.
+ */
+ public BinaryOffheapInputStream(long ptr, int cap) {
+ this(ptr, cap, false);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Pointer.
+ * @param cap Capacity.
+ * @param forceHeap If {@code true} method {@link #offheapPointer} returns 0 and unmarshalling will
+ * create heap-based objects.
+ */
+ public BinaryOffheapInputStream(long ptr, int cap, boolean forceHeap) {
+ this.ptr = ptr;
+ this.cap = cap;
+ this.forceHeap = forceHeap;
+
+ len = cap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int remaining() {
+ return cap - pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ return arrayCopy();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ byte[] res = new byte[len];
+
+ UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, res.length);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected byte readByteAndShift() {
+ return UNSAFE.getByte(ptr + pos++);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void copyAndShift(Object target, long off, int len) {
+ UNSAFE.copyMemory(null, ptr + pos, target, off, len);
+
+ shift(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected short readShortFast() {
+ return UNSAFE.getShort(ptr + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected char readCharFast() {
+ return UNSAFE.getChar(ptr + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int readIntFast() {
+ return UNSAFE.getInt(ptr + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long readLongFast() {
+ return UNSAFE.getLong(ptr + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected byte readBytePositioned0(int pos) {
+ return UNSAFE.getByte(ptr + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected short readShortPositioned0(int pos) {
+ short res = UNSAFE.getShort(ptr + pos);
+
+ if (!LITTLE_ENDIAN)
+ res = Short.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int readIntPositioned0(int pos) {
+ int res = UNSAFE.getInt(ptr + pos);
+
+ if (!LITTLE_ENDIAN)
+ res = Integer.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long offheapPointer() {
+ return forceHeap ? 0 : ptr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java
new file mode 100644
index 0000000..24b65b2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+/**
+ * Portable offheap output stream.
+ */
+public class BinaryOffheapOutputStream extends BinaryAbstractOutputStream {
+ /** Pointer. */
+ private long ptr;
+
+ /** Length of bytes that cen be used before resize is necessary. */
+ private int cap;
+
+ /**
+ * Constructor.
+ *
+ * @param cap Capacity.
+ */
+ public BinaryOffheapOutputStream(int cap) {
+ this(0, cap);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Pointer to existing address.
+ * @param cap Capacity.
+ */
+ public BinaryOffheapOutputStream(long ptr, int cap) {
+ this.ptr = ptr == 0 ? allocate(cap) : ptr;
+
+ this.cap = cap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ release(ptr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void ensureCapacity(int cnt) {
+ if (cnt > cap) {
+ int newCap = capacity(cap, cnt);
+
+ ptr = reallocate(ptr, newCap);
+
+ cap = newCap;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ return arrayCopy();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ byte[] res = new byte[pos];
+
+ UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, pos);
+
+ return res;
+ }
+
+ /**
+ * @return Pointer.
+ */
+ public long pointer() {
+ return ptr;
+ }
+
+ /**
+ * @return Capacity.
+ */
+ public int capacity() {
+ return cap;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeByteAndShift(byte val) {
+ UNSAFE.putByte(ptr + pos++, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void copyAndShift(Object src, long offset, int len) {
+ UNSAFE.copyMemory(src, offset, null, ptr + pos, len);
+
+ shift(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeShortFast(short val) {
+ UNSAFE.putShort(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeCharFast(char val) {
+ UNSAFE.putChar(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeIntFast(int val) {
+ UNSAFE.putInt(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeLongFast(long val) {
+ UNSAFE.putLong(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteByte(byte val) {
+ UNSAFE.putByte(ptr + pos++, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteShort(short val) {
+ if (!LITTLE_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ UNSAFE.putShort(ptr + pos, val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteShort(int pos, short val) {
+ if (!LITTLE_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ UNSAFE.putShort(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteChar(char val) {
+ if (!LITTLE_ENDIAN)
+ val = Character.reverseBytes(val);
+
+ UNSAFE.putChar(ptr + pos, val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteInt(int val) {
+ if (!LITTLE_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ UNSAFE.putInt(ptr + pos, val);
+
+ shift(4);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteInt(int pos, int val) {
+ if (!LITTLE_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ UNSAFE.putInt(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteLong(long val) {
+ if (!LITTLE_ENDIAN)
+ val = Long.reverseBytes(val);
+
+ UNSAFE.putLong(ptr + pos, val);
+
+ shift(8);
+ }
+
+ /**
+ * Allocate memory.
+ *
+ * @param cap Capacity.
+ * @return Pointer.
+ */
+ protected long allocate(int cap) {
+ return UNSAFE.allocateMemory(cap);
+ }
+
+ /**
+ * Reallocate memory.
+ *
+ * @param ptr Old pointer.
+ * @param cap Capacity.
+ * @return New pointer.
+ */
+ protected long reallocate(long ptr, int cap) {
+ return UNSAFE.reallocateMemory(ptr, cap);
+ }
+
+ /**
+ * Release memory.
+ *
+ * @param ptr Pointer.
+ */
+ protected void release(long ptr) {
+ UNSAFE.freeMemory(ptr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOutputStream.java
new file mode 100644
index 0000000..1c3f4bf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOutputStream.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+/**
+ * Portable output stream.
+ */
+public interface BinaryOutputStream extends BinaryStream, AutoCloseable {
+ /**
+ * Write byte value.
+ *
+ * @param val Byte value.
+ */
+ public void writeByte(byte val);
+
+ /**
+ * Write byte array.
+ *
+ * @param val Byte array.
+ */
+ public void writeByteArray(byte[] val);
+
+ /**
+ * Write boolean value.
+ *
+ * @param val Boolean value.
+ */
+ public void writeBoolean(boolean val);
+
+ /**
+ * Write boolean array.
+ *
+ * @param val Boolean array.
+ */
+ public void writeBooleanArray(boolean[] val);
+
+ /**
+ * Write short value.
+ *
+ * @param val Short value.
+ */
+ public void writeShort(short val);
+
+ /**
+ * Write short array.
+ *
+ * @param val Short array.
+ */
+ public void writeShortArray(short[] val);
+
+ /**
+ * Write char value.
+ *
+ * @param val Char value.
+ */
+ public void writeChar(char val);
+
+ /**
+ * Write char array.
+ *
+ * @param val Char array.
+ */
+ public void writeCharArray(char[] val);
+
+ /**
+ * Write int value.
+ *
+ * @param val Int value.
+ */
+ public void writeInt(int val);
+
+ /**
+ * Write short value at the given position.
+ *
+ * @param pos Position.
+ * @param val Value.
+ */
+ public void writeShort(int pos, short val);
+
+ /**
+ * Write int value to the given position.
+ *
+ * @param pos Position.
+ * @param val Value.
+ */
+ public void writeInt(int pos, int val);
+
+ /**
+ * Write int array.
+ *
+ * @param val Int array.
+ */
+ public void writeIntArray(int[] val);
+
+ /**
+ * Write float value.
+ *
+ * @param val Float value.
+ */
+ public void writeFloat(float val);
+
+ /**
+ * Write float array.
+ *
+ * @param val Float array.
+ */
+ public void writeFloatArray(float[] val);
+
+ /**
+ * Write long value.
+ *
+ * @param val Long value.
+ */
+ public void writeLong(long val);
+
+ /**
+ * Write long array.
+ *
+ * @param val Long array.
+ */
+ public void writeLongArray(long[] val);
+
+ /**
+ * Write double value.
+ *
+ * @param val Double value.
+ */
+ public void writeDouble(double val);
+
+ /**
+ * Write double array.
+ *
+ * @param val Double array.
+ */
+ public void writeDoubleArray(double[] val);
+
+ /**
+ * Write byte array.
+ *
+ * @param arr Array.
+ * @param off Offset.
+ * @param len Length.
+ */
+ public void write(byte[] arr, int off, int len);
+
+ /**
+ * Write data from unmanaged memory.
+ *
+ * @param addr Address.
+ * @param cnt Count.
+ */
+ public void write(long addr, int cnt);
+
+ /**
+ * Close the stream releasing resources.
+ */
+ @Override public void close();
+
+ /**
+ * Set position in unsafe mode.
+ *
+ * @param pos Position.
+ */
+ public void unsafePosition(int pos);
+
+ /**
+ * Ensure capacity for unsafe writes.
+ *
+ * @param cap Capacity.
+ */
+ public void unsafeEnsure(int cap);
+
+ /**
+ * Write byte in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteByte(byte val);
+
+ /**
+ * Write boolean in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteBoolean(boolean val);
+
+ /**
+ * Write short in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteShort(short val);
+
+ /**
+ * Write short in unsafe mode.
+ *
+ * @param pos Position.
+ * @param val Value.
+ */
+ public void unsafeWriteShort(int pos, short val);
+
+ /**
+ * Write char in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteChar(char val);
+
+ /**
+ * Write int in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteInt(int val);
+
+ /**
+ * Write int in unsafe mode.
+ *
+ * @param pos Position.
+ * @param val Value.
+ */
+ public void unsafeWriteInt(int pos, int val);
+
+ /**
+ * Write long in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteLong(long val);
+
+ /**
+ * Write float in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteFloat(float val);
+
+ /**
+ * Write double in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteDouble(double val);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java
new file mode 100644
index 0000000..229e34c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary.streams;
+
+/**
+ * Portable stream.
+ */
+public interface BinaryStream {
+ /**
+ * @return Position.
+ */
+ public int position();
+
+ /**
+ * @param pos Position.
+ */
+ public void position(int pos);
+
+ /**
+ * @return Underlying array.
+ */
+ public byte[] array();
+
+ /**
+ * @return Copy of data in the stream.
+ */
+ public byte[] arrayCopy();
+
+ /**
+ * @return Offheap pointer if stream is offheap based, otherwise {@code 0}.
+ */
+ public long offheapPointer();
+
+ /**
+ * @return {@code True} is stream is array based.
+ */
+ public boolean hasArray();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java
deleted file mode 100644
index 9d36b47..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.binary.streams;
-
-import org.apache.ignite.binary.BinaryObjectException;
-
-/**
- * Portable abstract input stream.
- */
-public abstract class PortableAbstractInputStream extends PortableAbstractStream
- implements PortableInputStream {
- /** Length of data inside array. */
- protected int len;
-
- /** {@inheritDoc} */
- @Override public byte readByte() {
- ensureEnoughData(1);
-
- return readByteAndShift();
- }
-
- /** {@inheritDoc} */
- @Override public byte[] readByteArray(int cnt) {
- ensureEnoughData(cnt);
-
- byte[] res = new byte[cnt];
-
- copyAndShift(res, BYTE_ARR_OFF, cnt);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readBoolean() {
- return readByte() == BYTE_ONE;
- }
-
- /** {@inheritDoc} */
- @Override public boolean[] readBooleanArray(int cnt) {
- ensureEnoughData(cnt);
-
- boolean[] res = new boolean[cnt];
-
- copyAndShift(res, BOOLEAN_ARR_OFF, cnt);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public short readShort() {
- ensureEnoughData(2);
-
- short res = readShortFast();
-
- shift(2);
-
- if (!LITTLE_ENDIAN)
- res = Short.reverseBytes(res);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public short[] readShortArray(int cnt) {
- int len = cnt << 1;
-
- ensureEnoughData(len);
-
- short[] res = new short[cnt];
-
- copyAndShift(res, SHORT_ARR_OFF, len);
-
- if (!LITTLE_ENDIAN) {
- for (int i = 0; i < res.length; i++)
- res[i] = Short.reverseBytes(res[i]);
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public char readChar() {
- ensureEnoughData(2);
-
- char res = readCharFast();
-
- shift(2);
-
- if (!LITTLE_ENDIAN)
- res = Character.reverseBytes(res);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public char[] readCharArray(int cnt) {
- int len = cnt << 1;
-
- ensureEnoughData(len);
-
- char[] res = new char[cnt];
-
- copyAndShift(res, CHAR_ARR_OFF, len);
-
- if (!LITTLE_ENDIAN) {
- for (int i = 0; i < res.length; i++)
- res[i] = Character.reverseBytes(res[i]);
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public int readInt() {
- ensureEnoughData(4);
-
- int res = readIntFast();
-
- shift(4);
-
- if (!LITTLE_ENDIAN)
- res = Integer.reverseBytes(res);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public int[] readIntArray(int cnt) {
- int len = cnt << 2;
-
- ensureEnoughData(len);
-
- int[] res = new int[cnt];
-
- copyAndShift(res, INT_ARR_OFF, len);
-
- if (!LITTLE_ENDIAN) {
- for (int i = 0; i < res.length; i++)
- res[i] = Integer.reverseBytes(res[i]);
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public byte readBytePositioned(int pos) {
- int delta = pos + 1 - this.pos;
-
- if (delta > 0)
- ensureEnoughData(delta);
-
- return readBytePositioned0(pos);
- }
-
- /** {@inheritDoc} */
- @Override public short readShortPositioned(int pos) {
- int delta = pos + 2 - this.pos;
-
- if (delta > 0)
- ensureEnoughData(delta);
-
- return readShortPositioned0(pos);
- }
-
- /** {@inheritDoc} */
- @Override public int readIntPositioned(int pos) {
- int delta = pos + 4 - this.pos;
-
- if (delta > 0)
- ensureEnoughData(delta);
-
- return readIntPositioned0(pos);
- }
-
- /** {@inheritDoc} */
- @Override public float readFloat() {
- return Float.intBitsToFloat(readInt());
- }
-
- /** {@inheritDoc} */
- @Override public float[] readFloatArray(int cnt) {
- int len = cnt << 2;
-
- ensureEnoughData(len);
-
- float[] res = new float[cnt];
-
- if (LITTLE_ENDIAN)
- copyAndShift(res, FLOAT_ARR_OFF, len);
- else {
- for (int i = 0; i < res.length; i++) {
- int x = readIntFast();
-
- shift(4);
-
- res[i] = Float.intBitsToFloat(Integer.reverseBytes(x));
- }
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public long readLong() {
- ensureEnoughData(8);
-
- long res = readLongFast();
-
- shift(8);
-
- if (!LITTLE_ENDIAN)
- res = Long.reverseBytes(res);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public long[] readLongArray(int cnt) {
- int len = cnt << 3;
-
- ensureEnoughData(len);
-
- long[] res = new long[cnt];
-
- copyAndShift(res, LONG_ARR_OFF, len);
-
- if (!LITTLE_ENDIAN) {
- for (int i = 0; i < res.length; i++)
- res[i] = Long.reverseBytes(res[i]);
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public double readDouble() {
- return Double.longBitsToDouble(readLong());
- }
-
- /** {@inheritDoc} */
- @Override public double[] readDoubleArray(int cnt) {
- int len = cnt << 3;
-
- ensureEnoughData(len);
-
- double[] res = new double[cnt];
-
- if (LITTLE_ENDIAN)
- copyAndShift(res, DOUBLE_ARR_OFF, len);
- else {
- for (int i = 0; i < res.length; i++) {
- long x = readLongFast();
-
- shift(8);
-
- res[i] = Double.longBitsToDouble(Long.reverseBytes(x));
- }
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public int read(byte[] arr, int off, int len) {
- if (len > remaining())
- len = remaining();
-
- copyAndShift(arr, BYTE_ARR_OFF + off, len);
-
- return len;
- }
-
- /** {@inheritDoc} */
- @Override public void position(int pos) {
- if (remaining() + this.pos < pos)
- throw new BinaryObjectException("Position is out of bounds: " + pos);
- else
- this.pos = pos;
- }
-
- /** {@inheritDoc} */
- @Override public long offheapPointer() {
- return 0;
- }
-
- /**
- * Ensure that there is enough data.
- *
- * @param cnt Length.
- */
- protected void ensureEnoughData(int cnt) {
- if (remaining() < cnt)
- throw new BinaryObjectException("Not enough data to read the value [position=" + pos +
- ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']');
- }
-
- /**
- * Read next byte from the stream and perform shift.
- *
- * @return Next byte.
- */
- protected abstract byte readByteAndShift();
-
- /**
- * Copy data to target object shift position afterwards.
- *
- * @param target Target.
- * @param off Offset.
- * @param len Length.
- */
- protected abstract void copyAndShift(Object target, long off, int len);
-
- /**
- * Read short value (fast path).
- *
- * @return Short value.
- */
- protected abstract short readShortFast();
-
- /**
- * Read char value (fast path).
- *
- * @return Char value.
- */
- protected abstract char readCharFast();
-
- /**
- * Read int value (fast path).
- *
- * @return Int value.
- */
- protected abstract int readIntFast();
-
- /**
- * Read long value (fast path).
- *
- * @return Long value.
- */
- protected abstract long readLongFast();
-
- /**
- * Internal routine for positioned byte value read.
- *
- * @param pos Position.
- * @return Int value.
- */
- protected abstract byte readBytePositioned0(int pos);
-
- /**
- * Internal routine for positioned short value read.
- *
- * @param pos Position.
- * @return Int value.
- */
- protected abstract short readShortPositioned0(int pos);
-
- /**
- * Internal routine for positioned int value read.
- *
- * @param pos Position.
- * @return Int value.
- */
- protected abstract int readIntPositioned0(int pos);
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java
deleted file mode 100644
index 85064c5..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.binary.streams;
-
-/**
- * Base portable output stream.
- */
-public abstract class PortableAbstractOutputStream extends PortableAbstractStream
- implements PortableOutputStream {
- /** Minimal capacity when it is reasonable to start doubling resize. */
- private static final int MIN_CAP = 256;
-
- /** {@inheritDoc} */
- @Override public void writeByte(byte val) {
- ensureCapacity(pos + 1);
-
- writeByteAndShift(val);
- }
-
- /** {@inheritDoc} */
- @Override public void writeByteArray(byte[] val) {
- ensureCapacity(pos + val.length);
-
- copyAndShift(val, BYTE_ARR_OFF, val.length);
- }
-
- /** {@inheritDoc} */
- @Override public void writeBoolean(boolean val) {
- writeByte(val ? BYTE_ONE : BYTE_ZERO);
- }
-
- /** {@inheritDoc} */
- @Override public void writeBooleanArray(boolean[] val) {
- ensureCapacity(pos + val.length);
-
- copyAndShift(val, BOOLEAN_ARR_OFF, val.length);
- }
-
- /** {@inheritDoc} */
- @Override public void writeShort(short val) {
- ensureCapacity(pos + 2);
-
- if (!LITTLE_ENDIAN)
- val = Short.reverseBytes(val);
-
- writeShortFast(val);
-
- shift(2);
- }
-
- /** {@inheritDoc} */
- @Override public void writeShortArray(short[] val) {
- int cnt = val.length << 1;
-
- ensureCapacity(pos + cnt);
-
- if (LITTLE_ENDIAN)
- copyAndShift(val, SHORT_ARR_OFF, cnt);
- else {
- for (short item : val)
- writeShortFast(Short.reverseBytes(item));
-
- shift(cnt);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeChar(char val) {
- ensureCapacity(pos + 2);
-
- if (!LITTLE_ENDIAN)
- val = Character.reverseBytes(val);
-
- writeCharFast(val);
-
- shift(2);
- }
-
- /** {@inheritDoc} */
- @Override public void writeCharArray(char[] val) {
- int cnt = val.length << 1;
-
- ensureCapacity(pos + cnt);
-
- if (LITTLE_ENDIAN)
- copyAndShift(val, CHAR_ARR_OFF, cnt);
- else {
- for (char item : val)
- writeCharFast(Character.reverseBytes(item));
-
- shift(cnt);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeInt(int val) {
- ensureCapacity(pos + 4);
-
- if (!LITTLE_ENDIAN)
- val = Integer.reverseBytes(val);
-
- writeIntFast(val);
-
- shift(4);
- }
-
- /** {@inheritDoc} */
- @Override public void writeShort(int pos, short val) {
- ensureCapacity(pos + 2);
-
- unsafeWriteShort(pos, val);
- }
-
- /** {@inheritDoc} */
- @Override public void writeInt(int pos, int val) {
- ensureCapacity(pos + 4);
-
- unsafeWriteInt(pos, val);
- }
-
- /** {@inheritDoc} */
- @Override public void writeIntArray(int[] val) {
- int cnt = val.length << 2;
-
- ensureCapacity(pos + cnt);
-
- if (LITTLE_ENDIAN)
- copyAndShift(val, INT_ARR_OFF, cnt);
- else {
- for (int item : val)
- writeIntFast(Integer.reverseBytes(item));
-
- shift(cnt);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeFloat(float val) {
- writeInt(Float.floatToIntBits(val));
- }
-
- /** {@inheritDoc} */
- @Override public void writeFloatArray(float[] val) {
- int cnt = val.length << 2;
-
- ensureCapacity(pos + cnt);
-
- if (LITTLE_ENDIAN)
- copyAndShift(val, FLOAT_ARR_OFF, cnt);
- else {
- for (float item : val) {
- writeIntFast(Integer.reverseBytes(Float.floatToIntBits(item)));
-
- shift(4);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeLong(long val) {
- ensureCapacity(pos + 8);
-
- if (!LITTLE_ENDIAN)
- val = Long.reverseBytes(val);
-
- writeLongFast(val);
-
- shift(8);
- }
-
- /** {@inheritDoc} */
- @Override public void writeLongArray(long[] val) {
- int cnt = val.length << 3;
-
- ensureCapacity(pos + cnt);
-
- if (LITTLE_ENDIAN)
- copyAndShift(val, LONG_ARR_OFF, cnt);
- else {
- for (long item : val)
- writeLongFast(Long.reverseBytes(item));
-
- shift(cnt);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeDouble(double val) {
- writeLong(Double.doubleToLongBits(val));
- }
-
- /** {@inheritDoc} */
- @Override public void writeDoubleArray(double[] val) {
- int cnt = val.length << 3;
-
- ensureCapacity(pos + cnt);
-
- if (LITTLE_ENDIAN)
- copyAndShift(val, DOUBLE_ARR_OFF, cnt);
- else {
- for (double item : val) {
- writeLongFast(Long.reverseBytes(Double.doubleToLongBits(item)));
-
- shift(8);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void write(byte[] arr, int off, int len) {
- ensureCapacity(pos + len);
-
- copyAndShift(arr, BYTE_ARR_OFF + off, len);
- }
-
- /** {@inheritDoc} */
- @Override public void write(long addr, int cnt) {
- ensureCapacity(pos + cnt);
-
- copyAndShift(null, addr, cnt);
- }
-
- /** {@inheritDoc} */
- @Override public void position(int pos) {
- ensureCapacity(pos);
-
- unsafePosition(pos);
- }
-
- /** {@inheritDoc} */
- @Override public long offheapPointer() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void unsafeEnsure(int cap) {
- ensureCapacity(pos + cap);
- }
-
- /** {@inheritDoc} */
- @Override public void unsafePosition(int pos) {
- this.pos = pos;
- }
-
- /** {@inheritDoc} */
- @Override public void unsafeWriteBoolean(boolean val) {
- unsafeWriteByte(val ? BYTE_ONE : BYTE_ZERO);
- }
-
- /** {@inheritDoc} */
- @Override public void unsafeWriteFloat(float val) {
- unsafeWriteInt(Float.floatToIntBits(val));
- }
-
- /** {@inheritDoc} */
- @Override public void unsafeWriteDouble(double val) {
- unsafeWriteLong(Double.doubleToLongBits(val));
- }
-
- /**
- * Calculate new capacity.
- *
- * @param curCap Current capacity.
- * @param reqCap Required capacity.
- * @return New capacity.
- */
- protected static int capacity(int curCap, int reqCap) {
- int newCap;
-
- if (reqCap < MIN_CAP)
- newCap = MIN_CAP;
- else {
- newCap = curCap << 1;
-
- if (newCap < reqCap)
- newCap = reqCap;
- }
-
- return newCap;
- }
-
- /**
- * Write next byte to the stream.
- *
- * @param val Value.
- */
- protected abstract void writeByteAndShift(byte val);
-
- /**
- * Copy source object to the stream shift position afterwards.
- *
- * @param src Source.
- * @param off Offset.
- * @param len Length.
- */
- protected abstract void copyAndShift(Object src, long off, int len);
-
- /**
- * Write short value (fast path).
- *
- * @param val Short value.
- */
- protected abstract void writeShortFast(short val);
-
- /**
- * Write char value (fast path).
- *
- * @param val Char value.
- */
- protected abstract void writeCharFast(char val);
-
- /**
- * Write int value (fast path).
- *
- * @param val Int value.
- */
- protected abstract void writeIntFast(int val);
-
- /**
- * Write long value (fast path).
- *
- * @param val Long value.
- */
- protected abstract void writeLongFast(long val);
-
- /**
- * Ensure capacity.
- *
- * @param cnt Required byte count.
- */
- protected abstract void ensureCapacity(int cnt);
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java
deleted file mode 100644
index fcc32cb..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.binary.streams;
-
-import java.nio.ByteOrder;
-import org.apache.ignite.internal.util.GridUnsafe;
-import sun.misc.Unsafe;
-
-/**
- * Portable abstract stream.
- */
-public abstract class PortableAbstractStream implements PortableStream {
- /** Byte: zero. */
- protected static final byte BYTE_ZERO = 0;
-
- /** Byte: one. */
- protected static final byte BYTE_ONE = 1;
-
- /** Whether little endian is used on the platform. */
- protected static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
-
- /** Unsafe instance. */
- protected static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
- /** Array offset: boolean. */
- protected static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
-
- /** Array offset: byte. */
- protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
- /** Array offset: short. */
- protected static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
-
- /** Array offset: char. */
- protected static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
-
- /** Array offset: int. */
- protected static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
-
- /** Array offset: float. */
- protected static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
-
- /** Array offset: long. */
- protected static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
-
- /** Array offset: double. */
- protected static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
-
- /** Position. */
- protected int pos;
-
- /** {@inheritDoc} */
- @Override public int position() {
- return pos;
- }
-
- /**
- * Shift position.
- *
- * @param cnt Byte count.
- */
- protected void shift(int cnt) {
- pos += cnt;
- }
-}