You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/21 08:48:44 UTC
[26/45] incubator-ignite git commit: ignite-1258: renaming portalbe
internal classes
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java
deleted file mode 100644
index 43d5490..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.streams;
-
-import static org.apache.ignite.internal.portable.GridPortableThreadLocalMemoryAllocator.*;
-
-/**
- * Portable heap output stream.
- */
-public final class GridPortableHeapOutputStream extends GridPortableAbstractOutputStream {
- /** Default capacity. */
- private static final int DFLT_CAP = 1024;
-
- /** Allocator. */
- private final GridPortableMemoryAllocator alloc;
-
- /** Data. */
- private byte[] data;
-
- /**
- * Constructor.
- */
- public GridPortableHeapOutputStream() {
- this(DFLT_CAP, DFLT_ALLOC);
- }
-
- /**
- * Constructor.
- *
- * @param cap Initial capacity.
- */
- public GridPortableHeapOutputStream(int cap) {
- this(cap, THREAD_LOCAL_ALLOC);
- }
-
- /**
- * Constructor.
- *
- * @param cap Initial capacity.
- * @param alloc Allocator.
- */
- public GridPortableHeapOutputStream(int cap, GridPortableMemoryAllocator alloc) {
- data = alloc.allocate(cap);
-
- this.alloc = alloc;
- }
-
- /**
- * Constructor.
- *
- * @param data Data.
- */
- public GridPortableHeapOutputStream(byte[] data) {
- this(data, DFLT_ALLOC);
- }
-
- /**
- * Constructor.
- *
- * @param data Data.
- * @param alloc Allocator.
- */
- public GridPortableHeapOutputStream(byte[] data, GridPortableMemoryAllocator alloc) {
- this.data = data;
- this.alloc = alloc;
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- alloc.release(data, pos);
- }
-
- /** {@inheritDoc} */
- @Override public void ensureCapacity(int cnt) {
- if (cnt > data.length) {
- int newCap = capacity(data.length, cnt);
-
- data = alloc.reallocate(data, newCap);
- }
- }
-
- /** {@inheritDoc} */
- @Override public byte[] array() {
- return data;
- }
-
- /** {@inheritDoc} */
- @Override public byte[] arrayCopy() {
- byte[] res = new byte[pos];
-
- UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, pos);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasArray() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected void writeByteAndShift(byte val) {
- data[pos++] = val;
- }
-
- /** {@inheritDoc} */
- @Override protected void copyAndShift(Object src, long off, int len) {
- UNSAFE.copyMemory(src, off, data, BYTE_ARR_OFF + pos, len);
-
- shift(len);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeShortFast(short val) {
- UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeCharFast(char val) {
- UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeIntFast(int val) {
- UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeLongFast(long val) {
- UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeIntPositioned(int pos, int val) {
- if (!LITTLE_ENDIAN)
- val = Integer.reverseBytes(val);
-
- UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java
deleted file mode 100644
index 4cfbd37..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.streams;
-
-/**
- * Portable memory allocator.
- */
-public interface GridPortableMemoryAllocator {
- /** Default memory allocator. */
- public static final GridPortableMemoryAllocator DFLT_ALLOC = new GridPortableSimpleMemoryAllocator();
-
- /**
- * Allocate memory.
- *
- * @param size Size.
- * @return Data.
- */
- public byte[] allocate(int size);
-
- /**
- * Reallocates memory.
- *
- * @param data Current data chunk.
- * @param size New size required.
- *
- * @return Data.
- */
- public byte[] reallocate(byte[] data, int size);
-
- /**
- * Release memory.
- *
- * @param data Data.
- * @param maxMsgSize Max message size sent during the time the allocator is used.
- */
- public void release(byte[] data, int maxMsgSize);
-
- /**
- * Allocate memory.
- *
- * @param size Size.
- * @return Address.
- */
- public long allocateDirect(int size);
-
- /**
- * Reallocate memory.
- *
- * @param addr Address.
- * @param size Size.
- * @return Address.
- */
- public long reallocateDirect(long addr, int size);
-
- /**
- * Release memory.
- *
- * @param addr Address.
- */
- public void releaseDirect(long addr);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java
deleted file mode 100644
index c65070c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.streams;
-
-/**
- * Portable off-heap input stream.
- */
-public class GridPortableOffheapInputStream extends GridPortableAbstractInputStream {
- /** Pointer. */
- private final long ptr;
-
- /** Capacity. */
- private final int cap;
-
- /** */
- private boolean forceHeap;
-
- /**
- * Constructor.
- *
- * @param ptr Pointer.
- * @param cap Capacity.
- */
- public GridPortableOffheapInputStream(long ptr, int cap) {
- this(ptr, cap, false);
- }
-
- /**
- * Constructor.
- *
- * @param ptr Pointer.
- * @param cap Capacity.
- * @param forceHeap If {@code true} method {@link #offheapPointer} returns 0 and unmarshalling will
- * create heap-based objects.
- */
- public GridPortableOffheapInputStream(long ptr, int cap, boolean forceHeap) {
- this.ptr = ptr;
- this.cap = cap;
- this.forceHeap = forceHeap;
-
- len = cap;
- }
-
- /** {@inheritDoc} */
- @Override public int remaining() {
- return cap - pos;
- }
-
- /** {@inheritDoc} */
- @Override public byte[] array() {
- return arrayCopy();
- }
-
- /** {@inheritDoc} */
- @Override public byte[] arrayCopy() {
- byte[] res = new byte[len];
-
- UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, res.length);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasArray() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override protected byte readByteAndShift() {
- return UNSAFE.getByte(ptr + pos++);
- }
-
- /** {@inheritDoc} */
- @Override protected void copyAndShift(Object target, long off, int len) {
- UNSAFE.copyMemory(null, ptr + pos, target, off, len);
-
- shift(len);
- }
-
- /** {@inheritDoc} */
- @Override protected short readShortFast() {
- return UNSAFE.getShort(ptr + pos);
- }
-
- /** {@inheritDoc} */
- @Override protected char readCharFast() {
- return UNSAFE.getChar(ptr + pos);
- }
-
- /** {@inheritDoc} */
- @Override protected int readIntFast() {
- return UNSAFE.getInt(ptr + pos);
- }
-
- /** {@inheritDoc} */
- @Override protected long readLongFast() {
- return UNSAFE.getLong(ptr + pos);
- }
-
- /** {@inheritDoc} */
- @Override protected int readIntPositioned(int pos) {
- int res = UNSAFE.getInt(ptr + pos);
-
- if (!LITTLE_ENDIAN)
- res = Integer.reverseBytes(res);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public long offheapPointer() {
- return forceHeap ? 0 : ptr;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java
deleted file mode 100644
index 41d49d4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.streams;
-
-/**
- * Portable offheap output stream.
- */
-public class GridPortableOffheapOutputStream extends GridPortableAbstractOutputStream {
- /** Pointer. */
- private long ptr;
-
- /** Length of bytes that cen be used before resize is necessary. */
- private int cap;
-
- /**
- * Constructor.
- *
- * @param cap Capacity.
- */
- public GridPortableOffheapOutputStream(int cap) {
- this(0, cap);
- }
-
- /**
- * Constructor.
- *
- * @param ptr Pointer to existing address.
- * @param cap Capacity.
- */
- public GridPortableOffheapOutputStream(long ptr, int cap) {
- this.ptr = ptr == 0 ? allocate(cap) : ptr;
-
- this.cap = cap;
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- release(ptr);
- }
-
- /** {@inheritDoc} */
- @Override public void ensureCapacity(int cnt) {
- if (cnt > cap) {
- int newCap = capacity(cap, cnt);
-
- ptr = reallocate(ptr, newCap);
-
- cap = newCap;
- }
- }
-
- /** {@inheritDoc} */
- @Override public byte[] array() {
- return arrayCopy();
- }
-
- /** {@inheritDoc} */
- @Override public byte[] arrayCopy() {
- byte[] res = new byte[pos];
-
- UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, pos);
-
- return res;
- }
-
- /**
- * @return Pointer.
- */
- public long pointer() {
- return ptr;
- }
-
- /**
- * @return Capacity.
- */
- public int capacity() {
- return cap;
- }
-
- /** {@inheritDoc} */
- @Override protected void writeByteAndShift(byte val) {
- UNSAFE.putByte(ptr + pos++, val);
- }
-
- /** {@inheritDoc} */
- @Override protected void copyAndShift(Object src, long offset, int len) {
- UNSAFE.copyMemory(src, offset, null, ptr + pos, len);
-
- shift(len);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeShortFast(short val) {
- UNSAFE.putShort(ptr + pos, val);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeCharFast(char val) {
- UNSAFE.putChar(ptr + pos, val);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeIntFast(int val) {
- UNSAFE.putInt(ptr + pos, val);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeLongFast(long val) {
- UNSAFE.putLong(ptr + pos, val);
- }
-
- /** {@inheritDoc} */
- @Override protected void writeIntPositioned(int pos, int val) {
- if (!LITTLE_ENDIAN)
- val = Integer.reverseBytes(val);
-
- UNSAFE.putInt(ptr + pos, val);
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasArray() {
- return false;
- }
-
- /**
- * Allocate memory.
- *
- * @param cap Capacity.
- * @return Pointer.
- */
- protected long allocate(int cap) {
- return UNSAFE.allocateMemory(cap);
- }
-
- /**
- * Reallocate memory.
- *
- * @param ptr Old pointer.
- * @param cap Capacity.
- * @return New pointer.
- */
- protected long reallocate(long ptr, int cap) {
- return UNSAFE.reallocateMemory(ptr, cap);
- }
-
- /**
- * Release memory.
- *
- * @param ptr Pointer.
- */
- protected void release(long ptr) {
- UNSAFE.freeMemory(ptr);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java
deleted file mode 100644
index 8c27bf6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.streams;
-
-import org.apache.ignite.internal.util.*;
-
-import sun.misc.*;
-
-/**
- * Naive implementation of portable memory allocator.
- */
-public class GridPortableSimpleMemoryAllocator implements GridPortableMemoryAllocator {
- /** Unsafe. */
- private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
- /** Array offset: byte. */
- protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
- /** {@inheritDoc} */
- @Override public byte[] allocate(int size) {
- return new byte[size];
- }
-
- /** {@inheritDoc} */
- @Override public byte[] reallocate(byte[] data, int size) {
- byte[] newData = new byte[size];
-
- UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
-
- return newData;
- }
-
- /** {@inheritDoc} */
- @Override public void release(byte[] data, int maxMsgSize) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public long allocateDirect(int size) {
- return UNSAFE.allocateMemory(size);
- }
-
- /** {@inheritDoc} */
- @Override public long reallocateDirect(long addr, int size) {
- return UNSAFE.reallocateMemory(addr, size);
- }
-
- /** {@inheritDoc} */
- @Override public void releaseDirect(long addr) {
- UNSAFE.freeMemory(addr);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java
new file mode 100644
index 0000000..9f0b1db
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import org.apache.ignite.portable.*;
+
+/**
+ * Portable abstract input stream.
+ */
+public abstract class PortableAbstractInputStream extends PortableAbstractStream
+ implements PortableInputStream {
+ /** Length of data inside array. */
+ protected int len;
+
+ /** {@inheritDoc} */
+ @Override public byte readByte() {
+ ensureEnoughData(1);
+
+ return readByteAndShift();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] readByteArray(int cnt) {
+ ensureEnoughData(cnt);
+
+ byte[] res = new byte[cnt];
+
+ copyAndShift(res, BYTE_ARR_OFF, cnt);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readBoolean() {
+ return readByte() == BYTE_ONE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean[] readBooleanArray(int cnt) {
+ ensureEnoughData(cnt);
+
+ boolean[] res = new boolean[cnt];
+
+ copyAndShift(res, BOOLEAN_ARR_OFF, cnt);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short readShort() {
+ ensureEnoughData(2);
+
+ short res = readShortFast();
+
+ shift(2);
+
+ if (!LITTLE_ENDIAN)
+ res = Short.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short[] readShortArray(int cnt) {
+ int len = cnt << 1;
+
+ ensureEnoughData(len);
+
+ short[] res = new short[cnt];
+
+ copyAndShift(res, SHORT_ARR_OFF, len);
+
+ if (!LITTLE_ENDIAN) {
+ for (int i = 0; i < res.length; i++)
+ res[i] = Short.reverseBytes(res[i]);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char readChar() {
+ ensureEnoughData(2);
+
+ char res = readCharFast();
+
+ shift(2);
+
+ if (!LITTLE_ENDIAN)
+ res = Character.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char[] readCharArray(int cnt) {
+ int len = cnt << 1;
+
+ ensureEnoughData(len);
+
+ char[] res = new char[cnt];
+
+ copyAndShift(res, CHAR_ARR_OFF, len);
+
+ if (!LITTLE_ENDIAN) {
+ for (int i = 0; i < res.length; i++)
+ res[i] = Character.reverseBytes(res[i]);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt() {
+ ensureEnoughData(4);
+
+ int res = readIntFast();
+
+ shift(4);
+
+ if (!LITTLE_ENDIAN)
+ res = Integer.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] readIntArray(int cnt) {
+ int len = cnt << 2;
+
+ ensureEnoughData(len);
+
+ int[] res = new int[cnt];
+
+ copyAndShift(res, INT_ARR_OFF, len);
+
+ if (!LITTLE_ENDIAN) {
+ for (int i = 0; i < res.length; i++)
+ res[i] = Integer.reverseBytes(res[i]);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt(int pos) {
+ int delta = pos + 4 - this.pos;
+
+ if (delta > 0)
+ ensureEnoughData(delta);
+
+ return readIntPositioned(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public float readFloat() {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /** {@inheritDoc} */
+ @Override public float[] readFloatArray(int cnt) {
+ int len = cnt << 2;
+
+ ensureEnoughData(len);
+
+ float[] res = new float[cnt];
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(res, FLOAT_ARR_OFF, len);
+ else {
+ for (int i = 0; i < res.length; i++) {
+ int x = readIntFast();
+
+ shift(4);
+
+ res[i] = Float.intBitsToFloat(Integer.reverseBytes(x));
+ }
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLong() {
+ ensureEnoughData(8);
+
+ long res = readLongFast();
+
+ shift(8);
+
+ if (!LITTLE_ENDIAN)
+ res = Long.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long[] readLongArray(int cnt) {
+ int len = cnt << 3;
+
+ ensureEnoughData(len);
+
+ long[] res = new long[cnt];
+
+ copyAndShift(res, LONG_ARR_OFF, len);
+
+ if (!LITTLE_ENDIAN) {
+ for (int i = 0; i < res.length; i++)
+ res[i] = Long.reverseBytes(res[i]);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public double readDouble() {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ /** {@inheritDoc} */
+ @Override public double[] readDoubleArray(int cnt) {
+ int len = cnt << 3;
+
+ ensureEnoughData(len);
+
+ double[] res = new double[cnt];
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(res, DOUBLE_ARR_OFF, len);
+ else {
+ for (int i = 0; i < res.length; i++) {
+ long x = readLongFast();
+
+ shift(8);
+
+ res[i] = Double.longBitsToDouble(Long.reverseBytes(x));
+ }
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(byte[] arr, int off, int len) {
+ if (len > remaining())
+ len = remaining();
+
+ copyAndShift(arr, BYTE_ARR_OFF + off, len);
+
+ return len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void position(int pos) {
+ if (remaining() + this.pos < pos)
+ throw new PortableException("Position is out of bounds: " + pos);
+ else
+ this.pos = pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long offheapPointer() {
+ return 0;
+ }
+
+ /**
+ * Ensure that there is enough data.
+ *
+ * @param cnt Length.
+ */
+ protected void ensureEnoughData(int cnt) {
+ if (remaining() < cnt)
+ throw new PortableException("Not enough data to read the value [position=" + pos +
+ ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']');
+ }
+
+ /**
+ * Read next byte from the stream and perform shift.
+ *
+ * @return Next byte.
+ */
+ protected abstract byte readByteAndShift();
+
+ /**
+ * Copy data to target object shift position afterwards.
+ *
+ * @param target Target.
+ * @param off Offset.
+ * @param len Length.
+ */
+ protected abstract void copyAndShift(Object target, long off, int len);
+
+ /**
+ * Read short value (fast path).
+ *
+ * @return Short value.
+ */
+ protected abstract short readShortFast();
+
+ /**
+ * Read char value (fast path).
+ *
+ * @return Char value.
+ */
+ protected abstract char readCharFast();
+
+ /**
+ * Read int value (fast path).
+ *
+ * @return Int value.
+ */
+ protected abstract int readIntFast();
+
+ /**
+ * Read long value (fast path).
+ *
+ * @return Long value.
+ */
+ protected abstract long readLongFast();
+
+ /**
+ * Internal routine for positioned int value read.
+ *
+ * @param pos Position.
+ * @return Int value.
+ */
+ protected abstract int readIntPositioned(int pos);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
new file mode 100644
index 0000000..2c3179a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Base portable output stream.
+ */
+public abstract class PortableAbstractOutputStream extends PortableAbstractStream
+ implements PortableOutputStream {
+ /** Minimal capacity when it is reasonable to start doubling resize. */
+ private static final int MIN_CAP = 256;
+
+ /** {@inheritDoc} */
+ @Override public void writeByte(byte val) {
+ ensureCapacity(pos + 1);
+
+ writeByteAndShift(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByteArray(byte[] val) {
+ ensureCapacity(pos + val.length);
+
+ copyAndShift(val, BYTE_ARR_OFF, val.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBoolean(boolean val) {
+ writeByte(val ? BYTE_ONE : BYTE_ZERO);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBooleanArray(boolean[] val) {
+ ensureCapacity(pos + val.length);
+
+ copyAndShift(val, BOOLEAN_ARR_OFF, val.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShort(short val) {
+ ensureCapacity(pos + 2);
+
+ if (!LITTLE_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ writeShortFast(val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShortArray(short[] val) {
+ int cnt = val.length << 1;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, SHORT_ARR_OFF, cnt);
+ else {
+ for (short item : val)
+ writeShortFast(Short.reverseBytes(item));
+
+ shift(cnt);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeChar(char val) {
+ ensureCapacity(pos + 2);
+
+ if (!LITTLE_ENDIAN)
+ val = Character.reverseBytes(val);
+
+ writeCharFast(val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeCharArray(char[] val) {
+ int cnt = val.length << 1;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, CHAR_ARR_OFF, cnt);
+ else {
+ for (char item : val)
+ writeCharFast(Character.reverseBytes(item));
+
+ shift(cnt);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int val) {
+ ensureCapacity(pos + 4);
+
+ if (!LITTLE_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ writeIntFast(val);
+
+ shift(4);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int pos, int val) {
+ ensureCapacity(pos + 4);
+
+ writeIntPositioned(pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeIntArray(int[] val) {
+ int cnt = val.length << 2;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, INT_ARR_OFF, cnt);
+ else {
+ for (int item : val)
+ writeIntFast(Integer.reverseBytes(item));
+
+ shift(cnt);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloat(float val) {
+ writeInt(Float.floatToIntBits(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloatArray(float[] val) {
+ int cnt = val.length << 2;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, FLOAT_ARR_OFF, cnt);
+ else {
+ for (float item : val) {
+ writeIntFast(Integer.reverseBytes(Float.floatToIntBits(item)));
+
+ shift(4);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLong(long val) {
+ ensureCapacity(pos + 8);
+
+ if (!LITTLE_ENDIAN)
+ val = Long.reverseBytes(val);
+
+ writeLongFast(val);
+
+ shift(8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLongArray(long[] val) {
+ int cnt = val.length << 3;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, LONG_ARR_OFF, cnt);
+ else {
+ for (long item : val)
+ writeLongFast(Long.reverseBytes(item));
+
+ shift(cnt);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDouble(double val) {
+ writeLong(Double.doubleToLongBits(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDoubleArray(double[] val) {
+ int cnt = val.length << 3;
+
+ ensureCapacity(pos + cnt);
+
+ if (LITTLE_ENDIAN)
+ copyAndShift(val, DOUBLE_ARR_OFF, cnt);
+ else {
+ for (double item : val) {
+ writeLongFast(Long.reverseBytes(Double.doubleToLongBits(item)));
+
+ shift(8);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] arr, int off, int len) {
+ ensureCapacity(pos + len);
+
+ copyAndShift(arr, BYTE_ARR_OFF + off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(long addr, int cnt) {
+ ensureCapacity(pos + cnt);
+
+ copyAndShift(null, addr, cnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void position(int pos) {
+ ensureCapacity(pos);
+
+ this.pos = pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long offheapPointer() {
+ return 0;
+ }
+
+ /**
+ * Calculate new capacity.
+ *
+ * @param curCap Current capacity.
+ * @param reqCap Required capacity.
+ * @return New capacity.
+ */
+ protected static int capacity(int curCap, int reqCap) {
+ int newCap;
+
+ if (reqCap < MIN_CAP)
+ newCap = MIN_CAP;
+ else {
+ newCap = curCap << 1;
+
+ if (newCap < reqCap)
+ newCap = reqCap;
+ }
+
+ return newCap;
+ }
+
+ /**
+ * Write next byte to the stream.
+ *
+ * @param val Value.
+ */
+ protected abstract void writeByteAndShift(byte val);
+
+ /**
+ * Copy source object to the stream shift position afterwards.
+ *
+ * @param src Source.
+ * @param off Offset.
+ * @param len Length.
+ */
+ protected abstract void copyAndShift(Object src, long off, int len);
+
+ /**
+ * Write short value (fast path).
+ *
+ * @param val Short value.
+ */
+ protected abstract void writeShortFast(short val);
+
+ /**
+ * Write char value (fast path).
+ *
+ * @param val Char value.
+ */
+ protected abstract void writeCharFast(char val);
+
+ /**
+ * Write int value (fast path).
+ *
+ * @param val Int value.
+ */
+ protected abstract void writeIntFast(int val);
+
+ /**
+ * Write long value (fast path).
+ *
+ * @param val Long value.
+ */
+ protected abstract void writeLongFast(long val);
+
+ /**
+ * Write int value to the given position.
+ *
+ * @param pos Position.
+ * @param val Value.
+ */
+ protected abstract void writeIntPositioned(int pos, int val);
+
+ /**
+ * Ensure capacity.
+ *
+ * @param cnt Required byte count.
+ */
+ protected abstract void ensureCapacity(int cnt);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java
new file mode 100644
index 0000000..a39662b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import org.apache.ignite.internal.util.*;
+
+import sun.misc.*;
+
+import java.nio.*;
+
+/**
+ * Portable abstract stream.
+ */
+public abstract class PortableAbstractStream implements PortableStream {
+ /** Byte: zero. */
+ protected static final byte BYTE_ZERO = 0;
+
+ /** Byte: one. */
+ protected static final byte BYTE_ONE = 1;
+
+ /** Whether little endian is used on the platform. */
+ protected static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
+
+ /** Unsafe instance. */
+ protected static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** Array offset: boolean. */
+ protected static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
+
+ /** Array offset: byte. */
+ protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** Array offset: short. */
+ protected static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
+
+ /** Array offset: char. */
+ protected static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
+
+ /** Array offset: int. */
+ protected static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
+
+ /** Array offset: float. */
+ protected static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
+
+ /** Array offset: long. */
+ protected static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
+
+ /** Array offset: double. */
+ protected static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
+
+ /** Position. */
+ protected int pos;
+
+ /** {@inheritDoc} */
+ @Override public int position() {
+ return pos;
+ }
+
+ /**
+ * Shift position.
+ *
+ * @param cnt Byte count.
+ */
+ protected void shift(int cnt) {
+ pos += cnt;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
new file mode 100644
index 0000000..3baa6a5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import java.util.*;
+
+/**
+ * Portable off-heap input stream.
+ */
+public final class PortableHeapInputStream extends PortableAbstractInputStream {
+ /** Data. */
+ private byte[] data;
+
+ /**
+ * Constructor.
+ *
+ * @param data Data.
+ */
+ public PortableHeapInputStream(byte[] data) {
+ this.data = data;
+
+ len = data.length;
+ }
+
+ /**
+ * @return Copy of this stream.
+ */
+ public PortableHeapInputStream copy() {
+ PortableHeapInputStream in = new PortableHeapInputStream(Arrays.copyOf(data, data.length));
+
+ in.position(pos);
+
+ return in;
+ }
+
+ /**
+ * Method called from JNI to resize stream.
+ *
+ * @param len Required length.
+ * @return Underlying byte array.
+ */
+ public byte[] resize(int len) {
+ if (data.length < len) {
+ byte[] data0 = new byte[len];
+
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF, data0, BYTE_ARR_OFF, data.length);
+
+ data = data0;
+ }
+
+ return data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int remaining() {
+ return data.length - pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ return data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ byte[] res = new byte[len];
+
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, res.length);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected byte readByteAndShift() {
+ return data[pos++];
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void copyAndShift(Object target, long off, int len) {
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF + pos, target, off, len);
+
+ shift(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected short readShortFast() {
+ return UNSAFE.getShort(data, BYTE_ARR_OFF + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected char readCharFast() {
+ return UNSAFE.getChar(data, BYTE_ARR_OFF + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int readIntFast() {
+ return UNSAFE.getInt(data, BYTE_ARR_OFF + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long readLongFast() {
+ return UNSAFE.getLong(data, BYTE_ARR_OFF + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int readIntPositioned(int pos) {
+ int res = UNSAFE.getInt(data, BYTE_ARR_OFF + pos);
+
+ if (!LITTLE_ENDIAN)
+ res = Integer.reverseBytes(res);
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
new file mode 100644
index 0000000..f492449
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import static org.apache.ignite.internal.portable.PortableThreadLocalMemoryAllocator.*;
+
+/**
+ * Portable heap output stream.
+ */
+public final class PortableHeapOutputStream extends PortableAbstractOutputStream {
+ /** Default capacity. */
+ private static final int DFLT_CAP = 1024;
+
+ /** Allocator. */
+ private final PortableMemoryAllocator alloc;
+
+ /** Data. */
+ private byte[] data;
+
+ /**
+ * Constructor.
+ */
+ public PortableHeapOutputStream() {
+ this(DFLT_CAP, DFLT_ALLOC);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cap Initial capacity.
+ */
+ public PortableHeapOutputStream(int cap) {
+ this(cap, THREAD_LOCAL_ALLOC);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cap Initial capacity.
+ * @param alloc Allocator.
+ */
+ public PortableHeapOutputStream(int cap, PortableMemoryAllocator alloc) {
+ data = alloc.allocate(cap);
+
+ this.alloc = alloc;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param data Data.
+ */
+ public PortableHeapOutputStream(byte[] data) {
+ this(data, DFLT_ALLOC);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param data Data.
+ * @param alloc Allocator.
+ */
+ public PortableHeapOutputStream(byte[] data, PortableMemoryAllocator alloc) {
+ this.data = data;
+ this.alloc = alloc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ alloc.release(data, pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void ensureCapacity(int cnt) {
+ if (cnt > data.length) {
+ int newCap = capacity(data.length, cnt);
+
+ data = alloc.reallocate(data, newCap);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ return data;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ byte[] res = new byte[pos];
+
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, pos);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeByteAndShift(byte val) {
+ data[pos++] = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void copyAndShift(Object src, long off, int len) {
+ UNSAFE.copyMemory(src, off, data, BYTE_ARR_OFF + pos, len);
+
+ shift(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeShortFast(short val) {
+ UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeCharFast(char val) {
+ UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeIntFast(int val) {
+ UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeLongFast(long val) {
+ UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeIntPositioned(int pos, int val) {
+ if (!LITTLE_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
new file mode 100644
index 0000000..cd6e039
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable input stream.
+ */
+public interface PortableInputStream extends PortableStream {
+ /**
+ * Read byte value.
+ *
+ * @return Byte value.
+ */
+ public byte readByte();
+
+ /**
+ * Read byte array.
+ *
+ * @param cnt Expected item count.
+ * @return Byte array.
+ */
+ public byte[] readByteArray(int cnt);
+
+ /**
+ * Reads {@code cnt} of bytes into byte array.
+ *
+ * @param arr Expected item count.
+ * @param off offset
+ * @param cnt number of bytes to read.
+ * @return actual length read.
+ */
+ public int read(byte[] arr, int off, int cnt);
+
+ /**
+ * Read boolean value.
+ *
+ * @return Boolean value.
+ */
+ public boolean readBoolean();
+
+ /**
+ * Read boolean array.
+ *
+ * @param cnt Expected item count.
+ * @return Boolean array.
+ */
+ public boolean[] readBooleanArray(int cnt);
+
+ /**
+ * Read short value.
+ *
+ * @return Short value.
+ */
+ public short readShort();
+
+ /**
+ * Read short array.
+ *
+ * @param cnt Expected item count.
+ * @return Short array.
+ */
+ public short[] readShortArray(int cnt);
+
+ /**
+ * Read char value.
+ *
+ * @return Char value.
+ */
+ public char readChar();
+
+ /**
+ * Read char array.
+ *
+ * @param cnt Expected item count.
+ * @return Char array.
+ */
+ public char[] readCharArray(int cnt);
+
+ /**
+ * Read int value.
+ *
+ * @return Int value.
+ */
+ public int readInt();
+
+ /**
+ * Read int value at the given position.
+ *
+ * @param pos Position.
+ * @return Value.
+ */
+ public int readInt(int pos);
+
+ /**
+ * Read int array.
+ *
+ * @param cnt Expected item count.
+ * @return Int array.
+ */
+ public int[] readIntArray(int cnt);
+
+ /**
+ * Read float value.
+ *
+ * @return Float value.
+ */
+ public float readFloat();
+
+ /**
+ * Read float array.
+ *
+ * @param cnt Expected item count.
+ * @return Float array.
+ */
+ public float[] readFloatArray(int cnt);
+
+ /**
+ * Read long value.
+ *
+ * @return Long value.
+ */
+ public long readLong();
+
+ /**
+ * Read long array.
+ *
+ * @param cnt Expected item count.
+ * @return Long array.
+ */
+ public long[] readLongArray(int cnt);
+
+ /**
+ * Read double value.
+ *
+ * @return Double value.
+ */
+ public double readDouble();
+
+ /**
+ * Read double array.
+ *
+ * @param cnt Expected item count.
+ * @return Double array.
+ */
+ public double[] readDoubleArray(int cnt);
+
+ /**
+ * Gets amount of remaining data in bytes.
+ *
+ * @return Remaining data.
+ */
+ public int remaining();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
new file mode 100644
index 0000000..071396a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable memory allocator.
+ */
+public interface PortableMemoryAllocator {
+ /** Default memory allocator. */
+ public static final PortableMemoryAllocator DFLT_ALLOC = new PortableSimpleMemoryAllocator();
+
+ /**
+ * Allocate memory.
+ *
+ * @param size Size.
+ * @return Data.
+ */
+ public byte[] allocate(int size);
+
+ /**
+ * Reallocates memory.
+ *
+ * @param data Current data chunk.
+ * @param size New size required.
+ *
+ * @return Data.
+ */
+ public byte[] reallocate(byte[] data, int size);
+
+ /**
+ * Release memory.
+ *
+ * @param data Data.
+ * @param maxMsgSize Max message size sent during the time the allocator is used.
+ */
+ public void release(byte[] data, int maxMsgSize);
+
+ /**
+ * Allocate memory.
+ *
+ * @param size Size.
+ * @return Address.
+ */
+ public long allocateDirect(int size);
+
+ /**
+ * Reallocate memory.
+ *
+ * @param addr Address.
+ * @param size Size.
+ * @return Address.
+ */
+ public long reallocateDirect(long addr, int size);
+
+ /**
+ * Release memory.
+ *
+ * @param addr Address.
+ */
+ public void releaseDirect(long addr);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
new file mode 100644
index 0000000..bfdd97a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable off-heap input stream.
+ */
+public class PortableOffheapInputStream extends PortableAbstractInputStream {
+ /** Pointer. */
+ private final long ptr;
+
+ /** Capacity. */
+ private final int cap;
+
+ /** */
+ private boolean forceHeap;
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Pointer.
+ * @param cap Capacity.
+ */
+ public PortableOffheapInputStream(long ptr, int cap) {
+ this(ptr, cap, false);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Pointer.
+ * @param cap Capacity.
+ * @param forceHeap If {@code true} method {@link #offheapPointer} returns 0 and unmarshalling will
+ * create heap-based objects.
+ */
+ public PortableOffheapInputStream(long ptr, int cap, boolean forceHeap) {
+ this.ptr = ptr;
+ this.cap = cap;
+ this.forceHeap = forceHeap;
+
+ len = cap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int remaining() {
+ return cap - pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ return arrayCopy();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ byte[] res = new byte[len];
+
+ UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, res.length);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected byte readByteAndShift() {
+ return UNSAFE.getByte(ptr + pos++);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void copyAndShift(Object target, long off, int len) {
+ UNSAFE.copyMemory(null, ptr + pos, target, off, len);
+
+ shift(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected short readShortFast() {
+ return UNSAFE.getShort(ptr + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected char readCharFast() {
+ return UNSAFE.getChar(ptr + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int readIntFast() {
+ return UNSAFE.getInt(ptr + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long readLongFast() {
+ return UNSAFE.getLong(ptr + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int readIntPositioned(int pos) {
+ int res = UNSAFE.getInt(ptr + pos);
+
+ if (!LITTLE_ENDIAN)
+ res = Integer.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long offheapPointer() {
+ return forceHeap ? 0 : ptr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
new file mode 100644
index 0000000..adfb6bf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable offheap output stream.
+ */
+public class PortableOffheapOutputStream extends PortableAbstractOutputStream {
+ /** Pointer. */
+ private long ptr;
+
+ /** Length of bytes that cen be used before resize is necessary. */
+ private int cap;
+
+ /**
+ * Constructor.
+ *
+ * @param cap Capacity.
+ */
+ public PortableOffheapOutputStream(int cap) {
+ this(0, cap);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Pointer to existing address.
+ * @param cap Capacity.
+ */
+ public PortableOffheapOutputStream(long ptr, int cap) {
+ this.ptr = ptr == 0 ? allocate(cap) : ptr;
+
+ this.cap = cap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ release(ptr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void ensureCapacity(int cnt) {
+ if (cnt > cap) {
+ int newCap = capacity(cap, cnt);
+
+ ptr = reallocate(ptr, newCap);
+
+ cap = newCap;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ return arrayCopy();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ byte[] res = new byte[pos];
+
+ UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, pos);
+
+ return res;
+ }
+
+ /**
+ * @return Pointer.
+ */
+ public long pointer() {
+ return ptr;
+ }
+
+ /**
+ * @return Capacity.
+ */
+ public int capacity() {
+ return cap;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeByteAndShift(byte val) {
+ UNSAFE.putByte(ptr + pos++, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void copyAndShift(Object src, long offset, int len) {
+ UNSAFE.copyMemory(src, offset, null, ptr + pos, len);
+
+ shift(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeShortFast(short val) {
+ UNSAFE.putShort(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeCharFast(char val) {
+ UNSAFE.putChar(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeIntFast(int val) {
+ UNSAFE.putInt(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeLongFast(long val) {
+ UNSAFE.putLong(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeIntPositioned(int pos, int val) {
+ if (!LITTLE_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ UNSAFE.putInt(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ return false;
+ }
+
+ /**
+ * Allocate memory.
+ *
+ * @param cap Capacity.
+ * @return Pointer.
+ */
+ protected long allocate(int cap) {
+ return UNSAFE.allocateMemory(cap);
+ }
+
+ /**
+ * Reallocate memory.
+ *
+ * @param ptr Old pointer.
+ * @param cap Capacity.
+ * @return New pointer.
+ */
+ protected long reallocate(long ptr, int cap) {
+ return UNSAFE.reallocateMemory(ptr, cap);
+ }
+
+ /**
+ * Release memory.
+ *
+ * @param ptr Pointer.
+ */
+ protected void release(long ptr) {
+ UNSAFE.freeMemory(ptr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
new file mode 100644
index 0000000..f320566
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable output stream.
+ */
+public interface PortableOutputStream extends PortableStream, AutoCloseable {
+ /**
+ * Write byte value.
+ *
+ * @param val Byte value.
+ */
+ public void writeByte(byte val);
+
+ /**
+ * Write byte array.
+ *
+ * @param val Byte array.
+ */
+ public void writeByteArray(byte[] val);
+
+ /**
+ * Write boolean value.
+ *
+ * @param val Boolean value.
+ */
+ public void writeBoolean(boolean val);
+
+ /**
+ * Write boolean array.
+ *
+ * @param val Boolean array.
+ */
+ public void writeBooleanArray(boolean[] val);
+
+ /**
+ * Write short value.
+ *
+ * @param val Short value.
+ */
+ public void writeShort(short val);
+
+ /**
+ * Write short array.
+ *
+ * @param val Short array.
+ */
+ public void writeShortArray(short[] val);
+
+ /**
+ * Write char value.
+ *
+ * @param val Char value.
+ */
+ public void writeChar(char val);
+
+ /**
+ * Write char array.
+ *
+ * @param val Char array.
+ */
+ public void writeCharArray(char[] val);
+
+ /**
+ * Write int value.
+ *
+ * @param val Int value.
+ */
+ public void writeInt(int val);
+
+ /**
+ * Write int value to the given position.
+ *
+ * @param pos Position.
+ * @param val Value.
+ */
+ public void writeInt(int pos, int val);
+
+ /**
+ * Write int array.
+ *
+ * @param val Int array.
+ */
+ public void writeIntArray(int[] val);
+
+ /**
+ * Write float value.
+ *
+ * @param val Float value.
+ */
+ public void writeFloat(float val);
+
+ /**
+ * Write float array.
+ *
+ * @param val Float array.
+ */
+ public void writeFloatArray(float[] val);
+
+ /**
+ * Write long value.
+ *
+ * @param val Long value.
+ */
+ public void writeLong(long val);
+
+ /**
+ * Write long array.
+ *
+ * @param val Long array.
+ */
+ public void writeLongArray(long[] val);
+
+ /**
+ * Write double value.
+ *
+ * @param val Double value.
+ */
+ public void writeDouble(double val);
+
+ /**
+ * Write double array.
+ *
+ * @param val Double array.
+ */
+ public void writeDoubleArray(double[] val);
+
+ /**
+ * Write byte array.
+ *
+ * @param arr Array.
+ * @param off Offset.
+ * @param len Length.
+ */
+ public void write(byte[] arr, int off, int len);
+
+ /**
+ * Write data from unmanaged memory.
+ *
+ * @param addr Address.
+ * @param cnt Count.
+ */
+ public void write(long addr, int cnt);
+
+ /**
+ * Close the stream releasing resources.
+ */
+ @Override public void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
new file mode 100644
index 0000000..6021140
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import org.apache.ignite.internal.util.*;
+
+import sun.misc.*;
+
+/**
+ * Naive implementation of portable memory allocator.
+ */
+public class PortableSimpleMemoryAllocator implements PortableMemoryAllocator {
+ /** Unsafe. */
+ private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** Array offset: byte. */
+ protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** {@inheritDoc} */
+ @Override public byte[] allocate(int size) {
+ return new byte[size];
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] reallocate(byte[] data, int size) {
+ byte[] newData = new byte[size];
+
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
+
+ return newData;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void release(byte[] data, int maxMsgSize) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public long allocateDirect(int size) {
+ return UNSAFE.allocateMemory(size);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long reallocateDirect(long addr, int size) {
+ return UNSAFE.reallocateMemory(addr, size);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void releaseDirect(long addr) {
+ UNSAFE.freeMemory(addr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java
new file mode 100644
index 0000000..5c84609
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable stream.
+ */
+public interface PortableStream {
+ /**
+ * @return Position.
+ */
+ public int position();
+
+ /**
+ * @param pos Position.
+ */
+ public void position(int pos);
+
+ /**
+ * @return Underlying array.
+ */
+ public byte[] array();
+
+ /**
+ * @return Copy of data in the stream.
+ */
+ public byte[] arrayCopy();
+
+ /**
+ * @return Offheap pointer if stream is offheap based, otherwise {@code 0}.
+ */
+ public long offheapPointer();
+
+ /**
+ * @return {@code True} is stream is array based.
+ */
+ public boolean hasArray();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
index 10deaf2..eed2cc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
@@ -61,7 +61,7 @@ public class CacheObjectPortableContext extends CacheObjectContext {
if (o == null)
return null;
- if (keepPortable || !portableEnabled() || !GridPortableUtils.isPortableOrCollectionType(o.getClass()))
+ if (keepPortable || !portableEnabled() || !PortableUtils.isPortableOrCollectionType(o.getClass()))
return o;
return unwrapPortable(o);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
index 15b2f00..cf5106a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.cacheobject.*;
-import org.apache.ignite.internal.processors.portable.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.tostring.*;
@@ -86,7 +85,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
};
/** */
- private GridPortableContext portableCtx;
+ private PortableContext portableCtx;
/** */
private Marshaller marsh;
@@ -99,7 +98,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
private IgnitePortables portables;
/** Metadata updates collected before metadata cache is initialized. */
- private final Map<Integer, GridPortableMetaDataImpl> metaBuf = new ConcurrentHashMap<>();
+ private final Map<Integer, PortableMetaDataImpl> metaBuf = new ConcurrentHashMap<>();
/** */
private UUID metaCacheQryId;
@@ -202,18 +201,18 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
if (marsh instanceof PortableMarshaller) {
- GridPortableMetaDataHandler metaHnd = new GridPortableMetaDataHandler() {
- @Override public void addMeta(int typeId, GridPortableMetaDataImpl newMeta)
+ PortableMetaDataHandler metaHnd = new PortableMetaDataHandler() {
+ @Override public void addMeta(int typeId, PortableMetaDataImpl newMeta)
throws PortableException {
if (metaDataCache == null) {
- GridPortableMetaDataImpl oldMeta = metaBuf.get(typeId);
+ PortableMetaDataImpl oldMeta = metaBuf.get(typeId);
if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta, null)) {
synchronized (this) {
Map<String, String> fields = new HashMap<>();
if (checkMeta(typeId, oldMeta, newMeta, fields)) {
- newMeta = new GridPortableMetaDataImpl(newMeta.typeName(),
+ newMeta = new PortableMetaDataImpl(newMeta.typeName(),
fields,
newMeta.affinityKeyFieldName());
@@ -243,7 +242,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
PortableMarshaller pMarh0 = (PortableMarshaller)marsh;
- portableCtx = new GridPortableContext(metaHnd, ctx.gridName());
+ portableCtx = new PortableContext(metaHnd, ctx.gridName());
IgniteUtils.invoke(PortableMarshaller.class, pMarh0, "setPortableContext", portableCtx);
@@ -308,7 +307,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
startLatch.countDown();
- for (Map.Entry<Integer, GridPortableMetaDataImpl> e : metaBuf.entrySet())
+ for (Map.Entry<Integer, PortableMetaDataImpl> e : metaBuf.entrySet())
addMeta(e.getKey(), e.getValue());
metaBuf.clear();
@@ -381,7 +380,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
if (type != CacheObject.TYPE_BYTE_ARR) {
assert size > 0 : size;
- GridPortableInputStream in = new GridPortableOffheapInputStream(ptr, size, forceHeap);
+ PortableInputStream in = new PortableOffheapInputStream(ptr, size, forceHeap);
return portableMarsh.unmarshal(in);
}
@@ -394,7 +393,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
if (obj == null)
return null;
- if (GridPortableUtils.isPortableType(obj.getClass()))
+ if (PortableUtils.isPortableType(obj.getClass()))
return obj;
if (obj instanceof Object[]) {
@@ -444,7 +443,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
assert obj0 instanceof PortableObject;
- ((GridPortableObjectImpl)obj0).detachAllowed(true);
+ ((PortableObjectImpl)obj0).detachAllowed(true);
return obj0;
}
@@ -458,24 +457,24 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
/** {@inheritDoc} */
@Override public PortableBuilder builder(int typeId) {
- return new GridPortableBuilderImpl(portableCtx, typeId);
+ return new PortableBuilderImpl(portableCtx, typeId);
}
/** {@inheritDoc} */
@Override public PortableBuilder builder(String clsName) {
- return new GridPortableBuilderImpl(portableCtx, clsName);
+ return new PortableBuilderImpl(portableCtx, clsName);
}
/** {@inheritDoc} */
@Override public PortableBuilder builder(PortableObject portableObj) {
- return GridPortableBuilderImpl.wrap(portableObj);
+ return PortableBuilderImpl.wrap(portableObj);
}
/** {@inheritDoc} */
@Override public void updateMetaData(int typeId, String typeName, @Nullable String affKeyFieldName,
Map<String, Integer> fieldTypeIds) throws PortableException {
portableCtx.updateMetaData(typeId,
- new GridPortableMetaDataImpl(typeName, fieldTypeNames(fieldTypeIds), affKeyFieldName));
+ new PortableMetaDataImpl(typeName, fieldTypeNames(fieldTypeIds), affKeyFieldName));
}
/** {@inheritDoc} */
@@ -613,7 +612,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
/**
* @return Portable context.
*/
- public GridPortableContext portableContext() {
+ public PortableContext portableContext() {
return portableCtx;
}
@@ -670,7 +669,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
obj = toPortable(obj);
if (obj instanceof PortableObject)
- return (GridPortableObjectImpl)obj;
+ return (PortableObjectImpl)obj;
}
return toCacheKeyObject0(obj, userObj);
@@ -687,15 +686,15 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
obj = toPortable(obj);
if (obj instanceof PortableObject)
- return (GridPortableObjectImpl)obj;
+ return (PortableObjectImpl)obj;
return toCacheObject0(obj, userObj);
}
/** {@inheritDoc} */
@Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) {
- if (type == GridPortableObjectImpl.TYPE_PORTABLE)
- return new GridPortableObjectImpl(portableContext(), bytes, 0);
+ if (type == PortableObjectImpl.TYPE_PORTABLE)
+ return new PortableObjectImpl(portableContext(), bytes, 0);
return super.toCacheObject(ctx, type, bytes);
}
@@ -708,8 +707,8 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
Object val = unmarshal(valPtr, !tmp);
- if (val instanceof GridPortableObjectOffheapImpl)
- return (GridPortableObjectOffheapImpl)val;
+ if (val instanceof PortableObjectOffheapImpl)
+ return (PortableObjectOffheapImpl)val;
return new CacheObjectImpl(val, null);
}
@@ -719,8 +718,8 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
if (!((CacheObjectPortableContext)ctx.cacheObjectContext()).portableEnabled())
return obj;
- if (obj instanceof GridPortableObjectOffheapImpl)
- return ((GridPortableObjectOffheapImpl)obj).heapCopy();
+ if (obj instanceof PortableObjectOffheapImpl)
+ return ((PortableObjectOffheapImpl)obj).heapCopy();
return obj;
}
@@ -752,8 +751,8 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
PortableMetadata newMeta, @Nullable Map<String, String> fields) throws PortableException {
assert newMeta != null;
- Map<String, String> oldFields = oldMeta != null ? ((GridPortableMetaDataImpl)oldMeta).fieldsMeta() : null;
- Map<String, String> newFields = ((GridPortableMetaDataImpl)newMeta).fieldsMeta();
+ Map<String, String> oldFields = oldMeta != null ? ((PortableMetaDataImpl)oldMeta).fieldsMeta() : null;
+ Map<String, String> newFields = ((PortableMetaDataImpl)newMeta).fieldsMeta();
boolean changed = false;
@@ -851,7 +850,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
Map<String, String> fields = new HashMap<>();
if (checkMeta(typeId, oldMeta, newMeta, fields)) {
- PortableMetadata res = new GridPortableMetaDataImpl(newMeta.typeName(),
+ PortableMetadata res = new PortableMetaDataImpl(newMeta.typeName(),
fields,
newMeta.affinityKeyFieldName());