You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/26 08:36:06 UTC
[2/4] ignite git commit: Moving platform classes to
"...internal.processors..." package to follow Ignite common approach.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
new file mode 100644
index 0000000..7e9587f
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.callback;
+
+/**
+ * Platform callback utility methods. Implemented in target platform. All methods in this class must be
+ * package-visible and invoked only through {@link PlatformCallbackGateway}.
+ */
+public class PlatformCallbackUtils {
+ /**
+ * Create cache store.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Memory pointer.
+ * @return Pointer.
+ */
+ static native long cacheStoreCreate(long envPtr, long memPtr);
+
+ /**
+ * @param envPtr Environment pointer.
+ * @param objPtr Object pointer.
+ * @param memPtr Memory pointer.
+ * @param cb Callback.
+ * @return Result.
+ */
+ static native int cacheStoreInvoke(long envPtr, long objPtr, long memPtr, Object cb);
+
+ /**
+ * @param envPtr Environment pointer.
+ * @param objPtr Object pointer.
+ */
+ static native void cacheStoreDestroy(long envPtr, long objPtr);
+
+ /**
+ * Creates cache store session.
+ *
+ * @param envPtr Environment pointer.
+ * @param storePtr Store instance pointer.
+ * @return Session instance pointer.
+ */
+ static native long cacheStoreSessionCreate(long envPtr, long storePtr);
+
+ /**
+ * Creates cache entry filter and returns a pointer.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Memory pointer.
+ * @return Pointer.
+ */
+ static native long cacheEntryFilterCreate(long envPtr, long memPtr);
+
+ /**
+ * @param envPtr Environment pointer.
+ * @param objPtr Pointer.
+ * @param memPtr Memory pointer.
+ * @return Result.
+ */
+ static native int cacheEntryFilterApply(long envPtr, long objPtr, long memPtr);
+
+ /**
+ * @param envPtr Environment pointer.
+ * @param objPtr Pointer.
+ */
+ static native void cacheEntryFilterDestroy(long envPtr, long objPtr);
+
+ /**
+ * Invoke cache entry processor.
+ *
+ * @param envPtr Environment pointer.
+ * @param outMemPtr Output memory pointer.
+ * @param inMemPtr Input memory pointer.
+ */
+ static native void cacheInvoke(long envPtr, long outMemPtr, long inMemPtr);
+
+ /**
+ * Perform native task map. Do not throw exceptions, serializing them to the output stream instead.
+ *
+ * @param envPtr Environment pointer.
+ * @param taskPtr Task pointer.
+ * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}).
+ * @param inMemPtr Input memory pointer.
+ */
+ static native void computeTaskMap(long envPtr, long taskPtr, long outMemPtr, long inMemPtr);
+
+ /**
+ * Perform native task job result notification.
+ *
+ * @param envPtr Environment pointer.
+ * @param taskPtr Task pointer.
+ * @param jobPtr Job pointer.
+ * @param memPtr Memory pointer (always zero for local job execution).
+ * @return Job result enum ordinal.
+ */
+ static native int computeTaskJobResult(long envPtr, long taskPtr, long jobPtr, long memPtr);
+
+ /**
+ * Perform native task reduce.
+ *
+ * @param envPtr Environment pointer.
+ * @param taskPtr Task pointer.
+ */
+ static native void computeTaskReduce(long envPtr, long taskPtr);
+
+ /**
+ * Complete task with native error.
+ *
+ * @param envPtr Environment pointer.
+ * @param taskPtr Task pointer.
+ * @param memPtr Memory pointer with exception data or {@code 0} in case of success.
+ */
+ static native void computeTaskComplete(long envPtr, long taskPtr, long memPtr);
+
+ /**
+ * Serialize native job.
+ *
+ * @param envPtr Environment pointer.
+ * @param jobPtr Job pointer.
+ * @param memPtr Memory pointer.
+ * @return {@code True} if serialization succeeded.
+ */
+ static native int computeJobSerialize(long envPtr, long jobPtr, long memPtr);
+
+ /**
+ * Create job in native platform.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Memory pointer.
+ * @return Pointer to job.
+ */
+ static native long computeJobCreate(long envPtr, long memPtr);
+
+ /**
+ * Execute native job on a node other than where it was created.
+ *
+ * @param envPtr Environment pointer.
+ * @param jobPtr Job pointer.
+ * @param cancel Cancel flag.
+ * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution.
+ */
+ static native void computeJobExecute(long envPtr, long jobPtr, int cancel, long memPtr);
+
+ /**
+ * Cancel the job.
+ *
+ * @param envPtr Environment pointer.
+ * @param jobPtr Job pointer.
+ */
+ static native void computeJobCancel(long envPtr, long jobPtr);
+
+ /**
+ * Destroy the job.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Pointer.
+ */
+ static native void computeJobDestroy(long envPtr, long ptr);
+
+ /**
+ * Invoke local callback.
+ *
+ * @param envPtr Environment pointer.
+ * @param cbPtr Callback pointer.
+ * @param memPtr Memory pointer.
+ */
+ static native void continuousQueryListenerApply(long envPtr, long cbPtr, long memPtr);
+
+ /**
+ * Create filter in native platform.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Memory pointer.
+ * @return Pointer to created filter.
+ */
+ static native long continuousQueryFilterCreate(long envPtr, long memPtr);
+
+ /**
+ * Invoke remote filter.
+ *
+ * @param envPtr Environment pointer.
+ * @param filterPtr Filter pointer.
+ * @param memPtr Memory pointer.
+ * @return Result.
+ */
+ static native int continuousQueryFilterApply(long envPtr, long filterPtr, long memPtr);
+
+ /**
+ * Release remote filter.
+ *
+ * @param envPtr Environment pointer.
+ * @param filterPtr Filter pointer.
+ */
+ static native void continuousQueryFilterRelease(long envPtr, long filterPtr);
+
+ /**
+ * Notify native data streamer about topology update.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Data streamer native pointer.
+ * @param topVer Topology version.
+ * @param topSize Topology size.
+ */
+ static native void dataStreamerTopologyUpdate(long envPtr, long ptr, long topVer, int topSize);
+
+ /**
+ * Invoke stream receiver.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Receiver native pointer.
+ * @param cache Cache object.
+ * @param memPtr Stream pointer.
+ * @param keepPortable Portable flag.
+ */
+ static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, Object cache, long memPtr,
+ boolean keepPortable);
+
+ /**
+ * Notify future with byte result.
+ *
+ * @param envPtr Environment pointer.
+ * @param futPtr Future pointer.
+ * @param res Result.
+ */
+ static native void futureByteResult(long envPtr, long futPtr, int res);
+
+ /**
+ * Notify future with boolean result.
+ *
+ * @param envPtr Environment pointer.
+ * @param futPtr Future pointer.
+ * @param res Result.
+ */
+ static native void futureBoolResult(long envPtr, long futPtr, int res);
+
+ /**
+ * Notify future with short result.
+ *
+ * @param envPtr Environment pointer.
+ * @param futPtr Future pointer.
+ * @param res Result.
+ */
+ static native void futureShortResult(long envPtr, long futPtr, int res);
+
+ /**
+ * Notify future with byte result.
+ *
+ * @param envPtr Environment pointer.
+ * @param futPtr Future pointer.
+ * @param res Result.
+ */
+ static native void futureCharResult(long envPtr, long futPtr, int res);
+
+ /**
+ * Notify future with int result.
+ *
+ * @param envPtr Environment pointer.
+ * @param futPtr Future pointer.
+ * @param res Result.
+ */
+ static native void futureIntResult(long envPtr, long futPtr, int res);
+
+ /**
+ * Notify future with float result.
+ *
+ * @param envPtr Environment pointer.
+ * @param futPtr Future pointer.
+ * @param res Result.
+ */
+ static native void futureFloatResult(long envPtr, long futPtr, float res);
+
+ /**
+ * Notify future with long result.
+ *
+ * @param envPtr Environment pointer.
+ * @param futPtr Future pointer.
+ * @param res Result.
+ */
+ static native void futureLongResult(long envPtr, long futPtr, long res);
+
+ /**
+ * Notify future with double result.
+ *
+ * @param envPtr Environment pointer.
+ * @param futPtr Future pointer.
+ * @param res Result.
+ */
+ static native void futureDoubleResult(long envPtr, long futPtr, double res);
+
+ /**
+ * Notify future with object result.
+ *
+ * @param envPtr Environment pointer.
+ * @param futPtr Future pointer.
+ * @param memPtr Memory pointer.
+ */
+ static native void futureObjectResult(long envPtr, long futPtr, long memPtr);
+
+ /**
+ * Notify future with null result.
+ *
+ * @param envPtr Environment pointer.
+ * @param futPtr Future pointer.
+ */
+ static native void futureNullResult(long envPtr, long futPtr);
+
+ /**
+ * Notify future with error.
+ *
+ * @param envPtr Environment pointer.
+ * @param futPtr Future pointer.
+ * @param memPtr Pointer to memory with error information.
+ */
+ static native void futureError(long envPtr, long futPtr, long memPtr);
+
+ /**
+ * Creates message filter and returns a pointer.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Memory pointer.
+ * @return Pointer.
+ */
+ static native long messagingFilterCreate(long envPtr, long memPtr);
+
+ /**
+ * @param envPtr Environment pointer.
+ * @param objPtr Pointer.
+ * @param memPtr Memory pointer.
+ * @return Result.
+ */
+ static native int messagingFilterApply(long envPtr, long objPtr, long memPtr);
+
+ /**
+ * @param envPtr Environment pointer.
+ * @param objPtr Pointer.
+ */
+ static native void messagingFilterDestroy(long envPtr, long objPtr);
+
+ /**
+ * Creates event filter and returns a pointer.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Memory pointer.
+ * @return Pointer.
+ */
+ static native long eventFilterCreate(long envPtr, long memPtr);
+
+ /**
+ * @param envPtr Environment pointer.
+ * @param objPtr Pointer.
+ * @param memPtr Memory pointer.
+ * @return Result.
+ */
+ static native int eventFilterApply(long envPtr, long objPtr, long memPtr);
+
+ /**
+ * @param envPtr Environment pointer.
+ * @param objPtr Pointer.
+ */
+ static native void eventFilterDestroy(long envPtr, long objPtr);
+
+ /**
+ * Sends node info to native target.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Ptr to a stream with serialized node.
+ */
+ static native void nodeInfo(long envPtr, long memPtr);
+
+ /**
+ * Kernal start callback.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Memory pointer.
+ */
+ static native void onStart(long envPtr, long memPtr);
+
+ /*
+ * Kernal stop callback.
+ *
+ * @param envPtr Environment pointer.
+ */
+ static native void onStop(long envPtr);
+
+ /**
+ * Lifecycle event callback.
+ *
+ * @param envPtr Environment pointer.
+ * @param ptr Holder pointer.
+ * @param evt Event.
+ */
+ static native void lifecycleEvent(long envPtr, long ptr, int evt);
+
+ /**
+ * Re-allocate external memory chunk.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Cross-platform pointer.
+ * @param cap Capacity.
+ */
+ static native void memoryReallocate(long envPtr, long memPtr, int cap);
+
+ /**
+ * Initializes native service.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Stream pointer.
+ * @return Pointer to the native platform service.
+ */
+ static native long serviceInit(long envPtr, long memPtr);
+
+ /**
+ * Executes native service.
+ *
+ * @param envPtr Environment pointer.
+ * @param svcPtr Pointer to the service in the native platform.
+ * @param memPtr Stream pointer.
+ */
+ static native void serviceExecute(long envPtr, long svcPtr, long memPtr);
+
+ /**
+ * Cancels native service.
+ *
+ * @param envPtr Environment pointer.
+ * @param svcPtr Pointer to the service in the native platform.
+ * @param memPtr Stream pointer.
+ */
+ static native void serviceCancel(long envPtr, long svcPtr, long memPtr);
+
+ /**
+ /**
+ * Invokes service method.
+ *
+ * @param envPtr Environment pointer.
+ * @param svcPtr Pointer to the service in the native platform.
+ * @param outMemPtr Output memory pointer.
+ * @param inMemPtr Input memory pointer.
+ */
+ static native void serviceInvokeMethod(long envPtr, long svcPtr, long outMemPtr, long inMemPtr);
+
+ /**
+ * Invokes cluster node filter.
+ *
+ * @param envPtr Environment pointer.
+ * @param memPtr Stream pointer.
+ */
+ static native int clusterNodeFilterApply(long envPtr, long memPtr);
+
+ /**
+ * Private constructor.
+ */
+ private PlatformCallbackUtils() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java
new file mode 100644
index 0000000..fbbabb7
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformAbstractMemory.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+/**
+ * Interop memory chunk abstraction.
+ */
+public abstract class PlatformAbstractMemory implements PlatformMemory {
+ /** Stream factory. */
+ private static final StreamFactory STREAM_FACTORY = PlatformMemoryUtils.LITTLE_ENDIAN ?
+ new LittleEndianStreamFactory() : new BigEndianStreamFactory();
+
+ /** Cross-platform memory pointer. */
+ protected long memPtr;
+
+ /**
+ * Constructor.
+ *
+ * @param memPtr Cross-platform memory pointer.
+ */
+ protected PlatformAbstractMemory(long memPtr) {
+ this.memPtr = memPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformInputStream input() {
+ return STREAM_FACTORY.createInput(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformOutputStream output() {
+ return STREAM_FACTORY.createOutput(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long pointer() {
+ return memPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long data() {
+ return PlatformMemoryUtils.data(memPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int capacity() {
+ return PlatformMemoryUtils.capacity(memPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int length() {
+ return PlatformMemoryUtils.length(memPtr);
+ }
+
+ /**
+ * Stream factory.
+ */
+ private static interface StreamFactory {
+ /**
+ * Create input stream.
+ *
+ * @param mem Memory.
+ * @return Input stream.
+ */
+ PlatformInputStreamImpl createInput(PlatformMemory mem);
+
+ /**
+ * Create output stream.
+ *
+ * @param mem Memory.
+ * @return Output stream.
+ */
+ PlatformOutputStreamImpl createOutput(PlatformMemory mem);
+ }
+
+ /**
+ * Stream factory for LITTLE ENDIAN architecture.
+ */
+ private static class LittleEndianStreamFactory implements StreamFactory {
+ /** {@inheritDoc} */
+ @Override public PlatformInputStreamImpl createInput(PlatformMemory mem) {
+ return new PlatformInputStreamImpl(mem);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformOutputStreamImpl createOutput(PlatformMemory mem) {
+ return new PlatformOutputStreamImpl(mem);
+ }
+ }
+
+ /**
+ * Stream factory for BIG ENDIAN architecture.
+ */
+ private static class BigEndianStreamFactory implements StreamFactory {
+ /** {@inheritDoc} */
+ @Override public PlatformInputStreamImpl createInput(PlatformMemory mem) {
+ return new PlatformBigEndianInputStreamImpl(mem);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformOutputStreamImpl createOutput(PlatformMemory mem) {
+ return new PlatformBigEndianOutputStreamImpl(mem);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java
new file mode 100644
index 0000000..b029ee0
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.platform.memory;
+
+/**
+ * Interop input stream implementation working with BIG ENDIAN architecture.
+ */
+public class PlatformBigEndianInputStreamImpl extends PlatformInputStreamImpl {
+ /**
+ * Constructor.
+ *
+ * @param mem Memory chunk.
+ */
+ public PlatformBigEndianInputStreamImpl(PlatformMemory mem) {
+ super(mem);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short readShort() {
+ return Short.reverseBytes(super.readShort());
+ }
+
+ /** {@inheritDoc} */
+ @Override public short[] readShortArray(int cnt) {
+ short[] res = super.readShortArray(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ res[i] = Short.reverseBytes(res[i]);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char readChar() {
+ return Character.reverseBytes(super.readChar());
+ }
+
+ /** {@inheritDoc} */
+ @Override public char[] readCharArray(int cnt) {
+ char[] res = super.readCharArray(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ res[i] = Character.reverseBytes(res[i]);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt() {
+ return Integer.reverseBytes(super.readInt());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt(int pos) {
+ return Integer.reverseBytes(super.readInt(pos));
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] readIntArray(int cnt) {
+ int[] res = super.readIntArray(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ res[i] = Integer.reverseBytes(res[i]);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float readFloat() {
+ return Float.intBitsToFloat(Integer.reverseBytes(Float.floatToIntBits(super.readFloat())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public float[] readFloatArray(int cnt) {
+ float[] res = super.readFloatArray(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ res[i] = Float.intBitsToFloat(Integer.reverseBytes(Float.floatToIntBits(res[i])));
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLong() {
+ return Long.reverseBytes(super.readLong());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long[] readLongArray(int cnt) {
+ long[] res = super.readLongArray(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ res[i] = Long.reverseBytes(res[i]);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public double readDouble() {
+ return Double.longBitsToDouble(Long.reverseBytes(Double.doubleToLongBits(super.readDouble())));
+ }
+
+ /** {@inheritDoc} */
+ @Override public double[] readDoubleArray(int cnt) {
+ double[] res = super.readDoubleArray(cnt);
+
+ for (int i = 0; i < cnt; i++)
+ res[i] = Double.longBitsToDouble(Long.reverseBytes(Double.doubleToLongBits(res[i])));
+
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
new file mode 100644
index 0000000..e1c1585
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*;
+
+/**
+ * Interop output stream implementation working with BIG ENDIAN architecture.
+ */
+public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl {
+ /**
+ * Constructor.
+ *
+ * @param mem Underlying memory chunk.
+ */
+ public PlatformBigEndianOutputStreamImpl(PlatformMemory mem) {
+ super(mem);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShort(short val) {
+ super.writeShort(Short.reverseBytes(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShortArray(short[] val) {
+ int cnt = val.length << 1;
+
+ ensureCapacity(pos + cnt);
+
+ long startPos = data + pos;
+
+ for (short item : val) {
+ UNSAFE.putShort(startPos, Short.reverseBytes(item));
+
+ startPos += 2;
+ }
+
+ shift(cnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeChar(char val) {
+ super.writeChar(Character.reverseBytes(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeCharArray(char[] val) {
+ int cnt = val.length << 1;
+
+ ensureCapacity(pos + cnt);
+
+ long startPos = data + pos;
+
+ for (char item : val) {
+ UNSAFE.putChar(startPos, Character.reverseBytes(item));
+
+ startPos += 2;
+ }
+
+ shift(cnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int val) {
+ super.writeInt(Integer.reverseBytes(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeIntArray(int[] val) {
+ int cnt = val.length << 2;
+
+ ensureCapacity(pos + cnt);
+
+ long startPos = data + pos;
+
+ for (int item : val) {
+ UNSAFE.putInt(startPos, Integer.reverseBytes(item));
+
+ startPos += 4;
+ }
+
+ shift(cnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int pos, int val) {
+ super.writeInt(pos, Integer.reverseBytes(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloatArray(float[] val) {
+ int cnt = val.length << 2;
+
+ ensureCapacity(pos + cnt);
+
+ long startPos = data + pos;
+
+ for (float item : val) {
+ UNSAFE.putInt(startPos, Integer.reverseBytes(Float.floatToIntBits(item)));
+
+ startPos += 4;
+ }
+
+ shift(cnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLong(long val) {
+ super.writeLong(Long.reverseBytes(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLongArray(long[] val) {
+ int cnt = val.length << 3;
+
+ ensureCapacity(pos + cnt);
+
+ long startPos = data + pos;
+
+ for (long item : val) {
+ UNSAFE.putLong(startPos, Long.reverseBytes(item));
+
+ startPos += 8;
+ }
+
+ shift(cnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDoubleArray(double[] val) {
+ int cnt = val.length << 3;
+
+ ensureCapacity(pos + cnt);
+
+ long startPos = data + pos;
+
+ for (double item : val) {
+ UNSAFE.putLong(startPos, Long.reverseBytes(Double.doubleToLongBits(item)));
+
+ startPos += 8;
+ }
+
+ shift(cnt);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java
new file mode 100644
index 0000000..0d47aff
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.platform.callback.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Interop external memory chunk.
+ */
+public class PlatformExternalMemory extends PlatformAbstractMemory {
+ /** Native gateway. */
+ private final PlatformCallbackGateway gate;
+
+ /**
+ * Constructor.
+ *
+ * @param gate Native gateway.
+ * @param memPtr Memory pointer.
+ */
+ public PlatformExternalMemory(@Nullable PlatformCallbackGateway gate, long memPtr) {
+ super(memPtr);
+
+ this.gate = gate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reallocate(int cap) {
+ if (gate == null)
+ throw new IgniteException("Failed to re-allocate external memory chunk because it is read-only.");
+
+ gate.memoryReallocate(memPtr, cap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // Do nothing, memory must be released by native platform.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java
new file mode 100644
index 0000000..9273e29
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import org.apache.ignite.internal.portable.streams.*;
+
+/**
+ * Interop output stream,
+ */
+public interface PlatformInputStream extends PortableInputStream {
+ /**
+ * Synchronize input. Must be called before start reading data from a memory changed by another platform.
+ */
+ public void synchronize();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java
new file mode 100644
index 0000000..68beaee
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import org.apache.ignite.*;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*;
+
+/**
+ * Interop input stream implementation.
+ */
+public class PlatformInputStreamImpl implements PlatformInputStream {
+ /** Underlying memory. */
+ private final PlatformMemory mem;
+
+ /** Real data pointer */
+ private long data;
+
+ /** Amount of available data. */
+ private int len;
+
+ /** Current position. */
+ private int pos;
+
+ /** Heap-copied data. */
+ private byte[] dataCopy;
+
+ /**
+ * Constructor.
+ *
+ * @param mem Underlying memory chunk.
+ */
+ public PlatformInputStreamImpl(PlatformMemory mem) {
+ this.mem = mem;
+
+ data = mem.data();
+ len = mem.length();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte readByte() {
+ ensureEnoughData(1);
+
+ return UNSAFE.getByte(data + pos++);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] readByteArray(int cnt) {
+ byte[] res = new byte[cnt];
+
+ copyAndShift(res, BYTE_ARR_OFF, cnt);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readBoolean() {
+ return readByte() == 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean[] readBooleanArray(int cnt) {
+ boolean[] res = new boolean[cnt];
+
+ copyAndShift(res, BOOLEAN_ARR_OFF, cnt);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short readShort() {
+ ensureEnoughData(2);
+
+ short res = UNSAFE.getShort(data + pos);
+
+ shift(2);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short[] readShortArray(int cnt) {
+ int len = cnt << 1;
+
+ short[] res = new short[cnt];
+
+ copyAndShift(res, SHORT_ARR_OFF, len);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char readChar() {
+ ensureEnoughData(2);
+
+ char res = UNSAFE.getChar(data + pos);
+
+ shift(2);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char[] readCharArray(int cnt) {
+ int len = cnt << 1;
+
+ char[] res = new char[cnt];
+
+ copyAndShift(res, CHAR_ARR_OFF, len);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt() {
+ ensureEnoughData(4);
+
+ int res = UNSAFE.getInt(data + pos);
+
+ shift(4);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt(int pos) {
+ int delta = pos + 4 - this.pos;
+
+ if (delta > 0)
+ ensureEnoughData(delta);
+
+ return UNSAFE.getInt(data + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] readIntArray(int cnt) {
+ int len = cnt << 2;
+
+ int[] res = new int[cnt];
+
+ copyAndShift(res, INT_ARR_OFF, len);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float readFloat() {
+ ensureEnoughData(4);
+
+ float res = UNSAFE.getFloat(data + pos);
+
+ shift(4);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float[] readFloatArray(int cnt) {
+ int len = cnt << 2;
+
+ float[] res = new float[cnt];
+
+ copyAndShift(res, FLOAT_ARR_OFF, len);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLong() {
+ ensureEnoughData(8);
+
+ long res = UNSAFE.getLong(data + pos);
+
+ shift(8);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long[] readLongArray(int cnt) {
+ int len = cnt << 3;
+
+ long[] res = new long[cnt];
+
+ copyAndShift(res, LONG_ARR_OFF, len);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public double readDouble() {
+ ensureEnoughData(8);
+
+ double res = UNSAFE.getDouble(data + pos);
+
+ shift(8);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public double[] readDoubleArray(int cnt) {
+ int len = cnt << 3;
+
+ double[] res = new double[cnt];
+
+ copyAndShift(res, DOUBLE_ARR_OFF, len);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(byte[] arr, int off, int len) {
+ if (len > remaining())
+ len = remaining();
+
+ copyAndShift(arr, BYTE_ARR_OFF + off, len);
+
+ return len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int remaining() {
+ return len - pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int position() {
+ return pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void position(int pos) {
+ if (pos > len)
+ throw new IgniteException("Position is out of bounds: " + pos);
+ else
+ this.pos = pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ return arrayCopy();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ if (dataCopy == null) {
+ dataCopy = new byte[len];
+
+ UNSAFE.copyMemory(null, data, dataCopy, BYTE_ARR_OFF, dataCopy.length);
+ }
+
+ return dataCopy;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long offheapPointer() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void synchronize() {
+ data = mem.data();
+ len = mem.length();
+ }
+
+ /**
+ * Ensure there is enough data in the stream.
+ *
+ * @param cnt Amount of byte expected to be available.
+ */
+ private void ensureEnoughData(int cnt) {
+ if (remaining() < cnt)
+ throw new IgniteException("Not enough data to read the value [position=" + pos +
+ ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']');
+ }
+
+ /**
+ * Copy required amount of data and shift position.
+ *
+ * @param target Target to copy data to.
+ * @param off Offset.
+ * @param cnt Count.
+ */
+ private void copyAndShift(Object target, long off, int cnt) {
+ ensureEnoughData(cnt);
+
+ UNSAFE.copyMemory(null, data + pos, target, off, cnt);
+
+ shift(cnt);
+ }
+
+ /**
+ * Shift position to the right.
+ *
+ * @param cnt Amount of bytes.
+ */
+ private void shift(int cnt) {
+ pos += cnt;
+
+ assert pos <= len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java
new file mode 100644
index 0000000..9d8f94e
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+/**
+ * Interop memory chunk.
+ */
+public interface PlatformMemory extends AutoCloseable {
+ /**
+ * Gets input stream.
+ *
+ * @return Input stream.
+ */
+ public PlatformInputStream input();
+
+ /**
+ * Gets output stream.
+ *
+ * @return Output stream.
+ */
+ public PlatformOutputStream output();
+
+ /**
+ * Gets pointer which can be passed between platforms.
+ *
+ * @return Pointer.
+ */
+ public long pointer();
+
+ /**
+ * Gets data pointer.
+ *
+ * @return Data pointer.
+ */
+ public long data();
+
+ /**
+ * Gets capacity.
+ *
+ * @return Capacity.
+ */
+ public int capacity();
+
+ /**
+ * Gets length.
+ *
+ * @return Length.
+ */
+ public int length();
+
+ /**
+ * Reallocate memory chunk.
+ *
+ * @param cap Minimum capacity.
+ */
+ public void reallocate(int cap);
+
+ /**
+ * Close memory releasing it.
+ */
+ @Override void close();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java
new file mode 100644
index 0000000..c2233a8
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+/**
+ * Interop memory manager interface.
+ */
+public interface PlatformMemoryManager {
+ /**
+ * Allocates memory.
+ *
+ * @return Memory.
+ */
+ public PlatformMemory allocate();
+
+ /**
+ * Allocates memory having at least the given capacity.
+ *
+ * @param cap Minimum capacity.
+ * @return Memory.
+ */
+ public PlatformMemory allocate(int cap);
+
+ /**
+ * Gets memory from existing pointer.
+ *
+ * @param memPtr Cross-platform memory pointer.
+ * @return Memory.
+ */
+ public PlatformMemory get(long memPtr);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java
new file mode 100644
index 0000000..83388e0
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import org.apache.ignite.internal.processors.platform.callback.*;
+import org.jetbrains.annotations.*;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*;
+
+/**
+ * Interop memory manager implementation.
+ */
+public class PlatformMemoryManagerImpl implements PlatformMemoryManager {
+ /** Native gateway. */
+ private final PlatformCallbackGateway gate;
+
+ /** Default allocation capacity. */
+ private final int dfltCap;
+
+ /** Thread-local pool. */
+ private final ThreadLocal<PlatformMemoryPool> threadLocPool = new ThreadLocal<>();
+
+ /**
+ * Constructor.
+ *
+ * @param gate Native gateway.
+ * @param dfltCap Default memory chunk capacity.
+ */
+ public PlatformMemoryManagerImpl(@Nullable PlatformCallbackGateway gate, int dfltCap) {
+ this.gate = gate;
+ this.dfltCap = dfltCap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformMemory allocate() {
+ return allocate(dfltCap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformMemory allocate(int cap) {
+ return pool().allocate(cap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformMemory get(long memPtr) {
+ int flags = flags(memPtr);
+
+ return isExternal(flags) ? new PlatformExternalMemory(gate, memPtr) :
+ isPooled(flags) ? pool().get(memPtr) : new PlatformUnpooledMemory(memPtr);
+ }
+
+ /**
+ * Gets or creates thread-local memory pool.
+ *
+ * @return Memory pool.
+ */
+ private PlatformMemoryPool pool() {
+ PlatformMemoryPool pool = threadLocPool.get();
+
+ if (pool == null) {
+ pool = new PlatformMemoryPool();
+
+ threadLocPool.set(pool);
+ }
+
+ return pool;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java
new file mode 100644
index 0000000..75db4b9
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*;
+
+/**
+ * Memory pool associated with a thread.
+ */
+public class PlatformMemoryPool {
+ /** base pointer. */
+ private final long poolPtr;
+
+ /** First pooled memory chunk. */
+ private PlatformPooledMemory mem1;
+
+ /** Second pooled memory chunk. */
+ private PlatformPooledMemory mem2;
+
+ /** Third pooled memory chunk. */
+ private PlatformPooledMemory mem3;
+
+ /**
+ * Constructor.
+ */
+ public PlatformMemoryPool() {
+ poolPtr = allocatePool();
+
+ sun.misc.Cleaner.create(this, new CleanerRunnable(poolPtr));
+ }
+
+ /**
+ * Allocate memory chunk, optionally pooling it.
+ *
+ * @param cap Minimum capacity.
+ * @return Memory chunk.
+ */
+ public PlatformMemory allocate(int cap) {
+ long memPtr = allocatePooled(poolPtr, cap);
+
+ // memPtr == 0 means that we failed to acquire thread-local memory chunk, so fallback to unpooled memory.
+ return memPtr != 0 ? get(memPtr) : new PlatformUnpooledMemory(allocateUnpooled(cap));
+ }
+
+ /**
+ * Re-allocate existing pool memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @param cap Minimum capacity.
+ */
+ void reallocate(long memPtr, int cap) {
+ reallocatePooled(memPtr, cap);
+ }
+
+ /**
+ * Release pooled memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ */
+ void release(long memPtr) {
+ releasePooled(memPtr);
+ }
+
+ /**
+ * Get pooled memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @return Memory chunk.
+ */
+ public PlatformMemory get(long memPtr) {
+ long delta = memPtr - poolPtr;
+
+ if (delta == POOL_HDR_OFF_MEM_1) {
+ if (mem1 == null)
+ mem1 = new PlatformPooledMemory(this, memPtr);
+
+ return mem1;
+ }
+ else if (delta == POOL_HDR_OFF_MEM_2) {
+ if (mem2 == null)
+ mem2 = new PlatformPooledMemory(this, memPtr);
+
+ return mem2;
+ }
+ else {
+ assert delta == POOL_HDR_OFF_MEM_3;
+
+ if (mem3 == null)
+ mem3 = new PlatformPooledMemory(this, memPtr);
+
+ return mem3;
+ }
+ }
+
+ /**
+ * Cleaner runnable.
+ */
+ private static class CleanerRunnable implements Runnable {
+ /** Pointer. */
+ private final long poolPtr;
+
+ /**
+ * Constructor.
+ *
+ * @param poolPtr Pointer.
+ */
+ private CleanerRunnable(long poolPtr) {
+ assert poolPtr != 0;
+
+ this.poolPtr = poolPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ PlatformMemoryUtils.releasePool(poolPtr);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java
new file mode 100644
index 0000000..c5ca971
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import org.apache.ignite.internal.util.*;
+import sun.misc.*;
+
+import java.nio.*;
+
+/**
+ * Utility classes for memory management.
+ */
+public class PlatformMemoryUtils {
+ /** Unsafe instance. */
+ public static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** Array offset: boolean. */
+ public static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
+
+ /** Array offset: byte. */
+ public static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** Array offset: short. */
+ public static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
+
+ /** Array offset: char. */
+ public static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
+
+ /** Array offset: int. */
+ public static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
+
+ /** Array offset: float. */
+ public static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
+
+ /** Array offset: long. */
+ public static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
+
+ /** Array offset: double. */
+ public static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
+
+ /** Whether little endian is used on the platform. */
+ public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
+
+ /** Header length. */
+ public static final int POOL_HDR_LEN = 64;
+
+ /** Pool header offset: first memory chunk. */
+ public static final int POOL_HDR_OFF_MEM_1 = 0;
+
+ /** Pool header offset: second memory chunk. */
+ public static final int POOL_HDR_OFF_MEM_2 = 20;
+
+ /** Pool header offset: third memory chunk. */
+ public static final int POOL_HDR_OFF_MEM_3 = 40;
+
+ /** Memory chunk header length. */
+ public static final int MEM_HDR_LEN = 20;
+
+ /** Offset: capacity. */
+ public static final int MEM_HDR_OFF_CAP = 8;
+
+ /** Offset: length. */
+ public static final int MEM_HDR_OFF_LEN = 12;
+
+ /** Offset: flags. */
+ public static final int MEM_HDR_OFF_FLAGS = 16;
+
+ /** Flag: external. */
+ public static final int FLAG_EXT = 0x1;
+
+ /** Flag: pooled. */
+ public static final int FLAG_POOLED = 0x2;
+
+ /** Flag: whether this pooled memory chunk is acquired. */
+ public static final int FLAG_ACQUIRED = 0x4;
+
+ /** --- COMMON METHODS. --- */
+
+ /**
+ * Gets data pointer for the given memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @return Data pointer.
+ */
+ public static long data(long memPtr) {
+ return UNSAFE.getLong(memPtr);
+ }
+
+ /**
+ * Gets capacity for the given memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @return Capacity.
+ */
+ public static int capacity(long memPtr) {
+ return UNSAFE.getInt(memPtr + MEM_HDR_OFF_CAP);
+ }
+
+ /**
+ * Sets capacity for the given memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @param cap Capacity.
+ */
+ public static void capacity(long memPtr, int cap) {
+ assert !isExternal(memPtr) : "Attempt to update external memory chunk capacity: " + memPtr;
+
+ UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap);
+ }
+
+ /**
+ * Gets length for the given memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @return Length.
+ */
+ public static int length(long memPtr) {
+ return UNSAFE.getInt(memPtr + MEM_HDR_OFF_LEN);
+ }
+
+ /**
+ * Sets length for the given memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @param len Length.
+ */
+ public static void length(long memPtr, int len) {
+ UNSAFE.putInt(memPtr + MEM_HDR_OFF_LEN, len);
+ }
+
+ /**
+ * Gets flags for the given memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @return Flags.
+ */
+ public static int flags(long memPtr) {
+ return UNSAFE.getInt(memPtr + MEM_HDR_OFF_FLAGS);
+ }
+
+ /**
+ * Sets flags for the given memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @param flags Flags.
+ */
+ public static void flags(long memPtr, int flags) {
+ assert !isExternal(memPtr) : "Attempt to update external memory chunk flags: " + memPtr;
+
+ UNSAFE.putInt(memPtr + MEM_HDR_OFF_FLAGS, flags);
+ }
+
+ /**
+ * Check whether this memory chunk is external.
+ *
+ * @param memPtr Memory pointer.
+ * @return {@code True} if owned by native platform.
+ */
+ public static boolean isExternal(long memPtr) {
+ return isExternal(flags(memPtr));
+ }
+
+ /**
+ * Check whether flags denote that this memory chunk is external.
+ *
+ * @param flags Flags.
+ * @return {@code True} if owned by native platform.
+ */
+ public static boolean isExternal(int flags) {
+ return (flags & FLAG_EXT) == FLAG_EXT;
+ }
+
+ /**
+ * Check whether this memory chunk is pooled.
+ *
+ * @param memPtr Memory pointer.
+ * @return {@code True} if pooled.
+ */
+ public static boolean isPooled(long memPtr) {
+ return isPooled(flags(memPtr));
+ }
+
+ /**
+ * Check whether flags denote pooled memory chunk.
+ *
+ * @param flags Flags.
+ * @return {@code True} if pooled.
+ */
+ public static boolean isPooled(int flags) {
+ return (flags & FLAG_POOLED) != 0;
+ }
+
+ /**
+ * Check whether this memory chunk is pooled and acquired.
+ *
+ * @param memPtr Memory pointer.
+ * @return {@code True} if pooled and acquired.
+ */
+ public static boolean isAcquired(long memPtr) {
+ return isAcquired(flags(memPtr));
+ }
+
+ /**
+ * Check whether flags denote pooled and acquired memory chunk.
+ *
+ * @param flags Flags.
+ * @return {@code True} if acquired.
+ */
+ public static boolean isAcquired(int flags) {
+ assert isPooled(flags);
+
+ return (flags & FLAG_ACQUIRED) != 0;
+ }
+
+ /** --- UNPOOLED MEMORY MANAGEMENT. --- */
+
+ /**
+ * Allocate unpooled memory chunk.
+ *
+ * @param cap Minimum capacity.
+ * @return New memory pointer.
+ */
+ public static long allocateUnpooled(int cap) {
+ assert cap > 0;
+
+ long memPtr = UNSAFE.allocateMemory(MEM_HDR_LEN);
+ long dataPtr = UNSAFE.allocateMemory(cap);
+
+ UNSAFE.putLong(memPtr, dataPtr); // Write address.
+ UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); // Write capacity.
+ UNSAFE.putInt(memPtr + MEM_HDR_OFF_LEN, 0); // Write length.
+ UNSAFE.putInt(memPtr + MEM_HDR_OFF_FLAGS, 0); // Write flags.
+
+ return memPtr;
+ }
+
+ /**
+ * Reallocate unpooled memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @param cap Minimum capacity.
+ */
+ public static void reallocateUnpooled(long memPtr, int cap) {
+ assert cap > 0;
+
+ assert !isExternal(memPtr) : "Attempt to reallocate external memory chunk directly: " + memPtr;
+ assert !isPooled(memPtr) : "Attempt to reallocate pooled memory chunk directly: " + memPtr;
+
+ long dataPtr = data(memPtr);
+
+ long newDataPtr = UNSAFE.reallocateMemory(dataPtr, cap);
+
+ if (dataPtr != newDataPtr)
+ UNSAFE.putLong(memPtr, newDataPtr); // Write new data address if needed.
+
+ UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); // Write new capacity.
+ }
+
+ /**
+ * Release unpooled memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ */
+ public static void releaseUnpooled(long memPtr) {
+ assert !isExternal(memPtr) : "Attempt to release external memory chunk directly: " + memPtr;
+ assert !isPooled(memPtr) : "Attempt to release pooled memory chunk directly: " + memPtr;
+
+ UNSAFE.freeMemory(data(memPtr));
+ UNSAFE.freeMemory(memPtr);
+ }
+
+ /** --- POOLED MEMORY MANAGEMENT. --- */
+
+ /**
+ * Allocate pool memory.
+ *
+ * @return Pool pointer.
+ */
+ public static long allocatePool() {
+ long poolPtr = UNSAFE.allocateMemory(POOL_HDR_LEN);
+
+ UNSAFE.setMemory(poolPtr, POOL_HDR_LEN, (byte)0);
+
+ flags(poolPtr + POOL_HDR_OFF_MEM_1, FLAG_POOLED);
+ flags(poolPtr + POOL_HDR_OFF_MEM_2, FLAG_POOLED);
+ flags(poolPtr + POOL_HDR_OFF_MEM_3, FLAG_POOLED);
+
+ return poolPtr;
+ }
+
+ /**
+ * Release pool memory.
+ *
+ * @param poolPtr Pool pointer.
+ */
+ public static void releasePool(long poolPtr) {
+ // Clean predefined memory chunks.
+ long mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_1);
+
+ if (mem != 0)
+ UNSAFE.freeMemory(mem);
+
+ mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_2);
+
+ if (mem != 0)
+ UNSAFE.freeMemory(mem);
+
+ mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_3);
+
+ if (mem != 0)
+ UNSAFE.freeMemory(mem);
+
+ // Clean pool chunk.
+ UNSAFE.freeMemory(poolPtr);
+ }
+
+ /**
+ * Allocate pooled memory chunk.
+ *
+ * @param poolPtr Pool pointer.
+ * @param cap Capacity.
+ * @return Cross-platform memory pointer or {@code 0} in case there are no free memory chunks in the pool.
+ */
+ public static long allocatePooled(long poolPtr, int cap) {
+ long memPtr1 = poolPtr + POOL_HDR_OFF_MEM_1;
+
+ if (isAcquired(memPtr1)) {
+ long memPtr2 = poolPtr + POOL_HDR_OFF_MEM_2;
+
+ if (isAcquired(memPtr2)) {
+ long memPtr3 = poolPtr + POOL_HDR_OFF_MEM_3;
+
+ if (isAcquired(memPtr3))
+ return 0L;
+ else {
+ allocatePooled0(memPtr3, cap);
+
+ return memPtr3;
+ }
+ }
+ else {
+ allocatePooled0(memPtr2, cap);
+
+ return memPtr2;
+ }
+ }
+ else {
+ allocatePooled0(memPtr1, cap);
+
+ return memPtr1;
+ }
+ }
+
+ /**
+ * Internal pooled memory chunk allocation routine.
+ *
+ * @param memPtr Memory pointer.
+ * @param cap Capacity.
+ */
+ private static void allocatePooled0(long memPtr, int cap) {
+ assert !isExternal(memPtr);
+ assert isPooled(memPtr);
+ assert !isAcquired(memPtr);
+
+ long data = UNSAFE.getLong(memPtr);
+
+ if (data == 0) {
+ // First allocation of the chunk.
+ data = UNSAFE.allocateMemory(cap);
+
+ UNSAFE.putLong(memPtr, data);
+ UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap);
+ }
+ else {
+ // Ensure that we have enough capacity.
+ int curCap = capacity(memPtr);
+
+ if (cap > curCap) {
+ data = UNSAFE.reallocateMemory(data, cap);
+
+ UNSAFE.putLong(memPtr, data);
+ UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap);
+ }
+ }
+
+ flags(memPtr, FLAG_POOLED | FLAG_ACQUIRED);
+ }
+
+ /**
+ * Reallocate pooled memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @param cap Minimum capacity.
+ */
+ public static void reallocatePooled(long memPtr, int cap) {
+ assert !isExternal(memPtr);
+ assert isPooled(memPtr);
+ assert isAcquired(memPtr);
+
+ long data = UNSAFE.getLong(memPtr);
+
+ assert data != 0;
+
+ int curCap = capacity(memPtr);
+
+ if (cap > curCap) {
+ data = UNSAFE.reallocateMemory(data, cap);
+
+ UNSAFE.putLong(memPtr, data);
+ UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap);
+ }
+ }
+
+ /**
+ * Release pooled memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ */
+ public static void releasePooled(long memPtr) {
+ assert !isExternal(memPtr);
+ assert isPooled(memPtr);
+ assert isAcquired(memPtr);
+
+ flags(memPtr, flags(memPtr) ^ FLAG_ACQUIRED);
+ }
+
+ /** --- UTILITY STUFF. --- */
+
+ /**
+ * Reallocate arbitrary memory chunk.
+ *
+ * @param memPtr Memory pointer.
+ * @param cap Capacity.
+ */
+ public static void reallocate(long memPtr, int cap) {
+ int flags = flags(memPtr);
+
+ if (isPooled(flags))
+ reallocatePooled(memPtr, cap);
+ else {
+ assert !isExternal(flags);
+
+ reallocateUnpooled(memPtr, cap);
+ }
+ }
+
+ /**
+ * Constructor.
+ */
+ private PlatformMemoryUtils() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java
new file mode 100644
index 0000000..eb2490a
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import org.apache.ignite.internal.portable.streams.*;
+
+/**
+ * Interop output stream.
+ */
+public interface PlatformOutputStream extends PortableOutputStream {
+ /**
+ * Synchronize output stream with underlying memory
+ */
+ public void synchronize();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java
new file mode 100644
index 0000000..6c7c865
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*;
+
+/**
+ * Interop output stream implementation.
+ */
+public class PlatformOutputStreamImpl implements PlatformOutputStream {
+ /** Underlying memory chunk. */
+ protected final PlatformMemory mem;
+
+ /** Pointer. */
+ protected long data;
+
+ /** Maximum capacity. */
+ protected int cap;
+
+ /** Current position. */
+ protected int pos;
+
+ /**
+ * Constructor.
+ *
+ * @param mem Underlying memory chunk.
+ */
+ public PlatformOutputStreamImpl(PlatformMemory mem) {
+ this.mem = mem;
+
+ data = mem.data();
+ cap = mem.capacity();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByte(byte val) {
+ ensureCapacity(pos + 1);
+
+ UNSAFE.putByte(data + pos++, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByteArray(byte[] val) {
+ copyAndShift(val, BYTE_ARR_OFF, val.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBoolean(boolean val) {
+ writeByte(val ? (byte) 1 : (byte) 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBooleanArray(boolean[] val) {
+ copyAndShift(val, BOOLEAN_ARR_OFF, val.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShort(short val) {
+ ensureCapacity(pos + 2);
+
+ UNSAFE.putShort(data + pos, val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShortArray(short[] val) {
+ copyAndShift(val, SHORT_ARR_OFF, val.length << 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeChar(char val) {
+ ensureCapacity(pos + 2);
+
+ UNSAFE.putChar(data + pos, val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeCharArray(char[] val) {
+ copyAndShift(val, CHAR_ARR_OFF, val.length << 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int val) {
+ ensureCapacity(pos + 4);
+
+ UNSAFE.putInt(data + pos, val);
+
+ shift(4);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeIntArray(int[] val) {
+ copyAndShift(val, INT_ARR_OFF, val.length << 2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int pos, int val) {
+ ensureCapacity(pos + 4);
+
+ UNSAFE.putInt(data + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloat(float val) {
+ writeInt(Float.floatToIntBits(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloatArray(float[] val) {
+ copyAndShift(val, FLOAT_ARR_OFF, val.length << 2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLong(long val) {
+ ensureCapacity(pos + 8);
+
+ UNSAFE.putLong(data + pos, val);
+
+ shift(8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLongArray(long[] val) {
+ copyAndShift(val, LONG_ARR_OFF, val.length << 3);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDouble(double val) {
+ writeLong(Double.doubleToLongBits(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDoubleArray(double[] val) {
+ copyAndShift(val, DOUBLE_ARR_OFF, val.length << 3);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] arr, int off, int len) {
+ copyAndShift(arr, BYTE_ARR_OFF + off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(long addr, int cnt) {
+ copyAndShift(null, addr, cnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int position() {
+ return pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void position(int pos) {
+ ensureCapacity(pos);
+
+ this.pos = pos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ assert false;
+
+ throw new UnsupportedOperationException("Should not be called.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ assert false;
+
+ throw new UnsupportedOperationException("Should not be called.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public long offheapPointer() {
+ assert false;
+
+ throw new UnsupportedOperationException("Should not be called.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ assert false;
+
+ throw new UnsupportedOperationException("Should not be called.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void synchronize() {
+ PlatformMemoryUtils.length(mem.pointer(), pos);
+ }
+
+ /**
+ * Ensure capacity.
+ *
+ * @param reqCap Required byte count.
+ */
+ protected void ensureCapacity(int reqCap) {
+ if (reqCap > cap) {
+ int newCap = cap << 1;
+
+ if (newCap < reqCap)
+ newCap = reqCap;
+
+ mem.reallocate(newCap);
+
+ assert mem.capacity() >= newCap;
+
+ data = mem.data();
+ cap = newCap;
+ }
+ }
+
+ /**
+ * Shift position.
+ *
+ * @param cnt Byte count.
+ */
+ protected void shift(int cnt) {
+ pos += cnt;
+ }
+
+ /**
+ * Copy source object to the stream shifting position afterwards.
+ *
+ * @param src Source.
+ * @param off Offset.
+ * @param len Length.
+ */
+ private void copyAndShift(Object src, long off, int len) {
+ ensureCapacity(pos + len);
+
+ UNSAFE.copyMemory(src, off, null, data + pos, len);
+
+ shift(len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java
new file mode 100644
index 0000000..98a9a58
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*;
+
+/**
+ * Interop pooled memory chunk.
+ */
+public class PlatformPooledMemory extends PlatformAbstractMemory {
+ /** Owning memory pool. */
+ private final PlatformMemoryPool pool;
+
+ /**
+ * Constructor.
+ *
+ * @param pool Owning memory pool.
+ * @param memPtr Cross-platform memory pointer.
+ */
+ public PlatformPooledMemory(PlatformMemoryPool pool, long memPtr) {
+ super(memPtr);
+
+ assert isPooled(memPtr);
+ assert isAcquired(memPtr);
+
+ this.pool = pool;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reallocate(int cap) {
+ assert isAcquired(memPtr);
+
+ // Try doubling capacity to avoid excessive allocations.
+ int doubledCap = PlatformMemoryUtils.capacity(memPtr) << 1;
+
+ if (doubledCap > cap)
+ cap = doubledCap;
+
+ pool.reallocate(memPtr, cap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ assert isAcquired(memPtr);
+
+ pool.release(memPtr); // Return to the pool.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java
new file mode 100644
index 0000000..a236dab
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.*;
+
+/**
+ * Interop un-pooled memory chunk.
+ */
+public class PlatformUnpooledMemory extends PlatformAbstractMemory {
+ /**
+ * Constructor.
+ *
+ * @param memPtr Cross-platform memory pointer.
+ */
+ public PlatformUnpooledMemory(long memPtr) {
+ super(memPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reallocate(int cap) {
+ // Try doubling capacity to avoid excessive allocations.
+ int doubledCap = PlatformMemoryUtils.capacity(memPtr) << 1;
+
+ if (doubledCap > cap)
+ cap = doubledCap;
+
+ reallocateUnpooled(memPtr, cap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ releaseUnpooled(memPtr);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderBiClosure.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderBiClosure.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderBiClosure.java
new file mode 100644
index 0000000..ea808d7
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderBiClosure.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.utils;
+
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.lang.*;
+
+/**
+ * Reader bi-closure.
+ */
+public interface PlatformReaderBiClosure<T1, T2> {
+ /**
+ * Read object from reader.
+ *
+ * @param reader Reader.
+ * @return Object.
+ */
+ IgniteBiTuple<T1, T2> read(PortableRawReaderEx reader);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderClosure.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderClosure.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderClosure.java
new file mode 100644
index 0000000..cd0523c
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformReaderClosure.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.utils;
+
+import org.apache.ignite.internal.portable.*;
+
+/**
+ * Reader closure.
+ */
+public interface PlatformReaderClosure<T> {
+
+ /**
+ * Read object from reader.
+ *
+ * @param reader Reader.
+ * @return Object.
+ */
+ T read(PortableRawReaderEx reader);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2225f8d2/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformWriterBiClosure.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformWriterBiClosure.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformWriterBiClosure.java
new file mode 100644
index 0000000..89a73b0
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformWriterBiClosure.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.utils;
+
+import org.apache.ignite.internal.portable.*;
+
+/**
+ * Interop writer bi-closure.
+ */
+public interface PlatformWriterBiClosure<T1, T2> {
+ /**
+ * Write values.
+ *
+ * @param writer Writer.
+ * @param val1 Value 1.
+ * @param val2 Value 2.
+ */
+ public void write(PortableRawWriterEx writer, T1 val1, T2 val2);
+}