You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/03 11:28:30 UTC
[02/14] ignite git commit: IGNITE-1364: WIP.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp b/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp
new file mode 100644
index 0000000..d1800b3
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include "gridgain/impl/cache/query/query_impl.h"
+
+using namespace ignite::common::concurrent;
+using namespace ignite::common::java;
+using namespace gridgain::impl::interop;
+using namespace gridgain::impl::portable;
+
+namespace gridgain
+{
+ namespace impl
+ {
+ namespace cache
+ {
+ namespace query
+ {
+ /** Operation: get all entries. */
+ const int32_t OP_GET_ALL = 1;
+
+ /** Operation: get single entry. */
+ const int32_t OP_GET_SINGLE = 3;
+
+ QueryCursorImpl::QueryCursorImpl(SharedPointer<GridEnvironment> env, jobject javaRef) :
+ env(env), javaRef(javaRef), iterCalled(false), getAllCalled(false), hasNext(false)
+ {
+ // No-op.
+ }
+
+ QueryCursorImpl::~QueryCursorImpl()
+ {
+ // 1. Close the cursor.
+ env.Get()->Context()->QueryCursorClose(javaRef);
+
+ // 2. Release Java reference.
+ JniContext::Release(javaRef);
+ }
+
+ bool QueryCursorImpl::HasNext(GridError* err)
+ {
+ // Check whether GetAll() was called earlier.
+ if (getAllCalled)
+ {
+ *err = GridError(GridError::GG_ERR_GENERIC,
+ "Cannot use HasNext() method because GetAll() was called.");
+
+ return false;
+ }
+
+ // Create iterator in Java if needed.
+ if (!CreateIteratorIfNeeded(err))
+ return false;
+
+ return hasNext;
+ }
+
+ void QueryCursorImpl::GetNext(OutputOperation& op, GridError* err)
+ {
+ // Check whether GetAll() was called earlier.
+ if (getAllCalled)
+ {
+ *err = GridError(GridError::GG_ERR_GENERIC,
+ "Cannot use GetNext() method because GetAll() was called.");
+
+ return;
+ }
+
+ // Create iterator in Java if needed.
+ if (!CreateIteratorIfNeeded(err))
+ return;
+
+ if (hasNext)
+ {
+ JniErrorInfo jniErr;
+
+ SharedPointer<InteropMemory> inMem = env.Get()->AllocateMemory();
+
+ env.Get()->Context()->TargetOutStream(javaRef, OP_GET_SINGLE, inMem.Get()->PointerLong(), &jniErr);
+
+ GridError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+ if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+ {
+ InteropInputStream in(inMem.Get());
+
+ portable::PortableReaderImpl reader(&in);
+
+ op.ProcessOutput(reader);
+
+ hasNext = IteratorHasNext(err);
+ }
+ }
+ else
+ {
+ // Ensure we do not overwrite possible previous error.
+ if (err->GetCode() == GridError::GG_SUCCESS)
+ *err = GridError(GridError::GG_ERR_GENERIC, "No more elements available.");
+ }
+ }
+
+ void QueryCursorImpl::GetAll(OutputOperation& op, GridError* err)
+ {
+ // Check whether any of iterator methods were called.
+ if (iterCalled)
+ {
+ *err = GridError(GridError::GG_ERR_GENERIC,
+ "Cannot use GetAll() method because an iteration method was called.");
+
+ return;
+ }
+
+ // Check whether GetAll was called before.
+ if (getAllCalled)
+ {
+ *err = GridError(GridError::GG_ERR_GENERIC,
+ "Cannot use GetNext() method because GetAll() was called.");
+
+ return;
+ }
+
+ // Get data.
+ JniErrorInfo jniErr;
+
+ SharedPointer<InteropMemory> inMem = env.Get()->AllocateMemory();
+
+ env.Get()->Context()->TargetOutStream(javaRef, OP_GET_ALL, inMem.Get()->PointerLong(), &jniErr);
+
+ GridError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+ if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+ {
+ getAllCalled = true;
+
+ InteropInputStream in(inMem.Get());
+
+ portable::PortableReaderImpl reader(&in);
+
+ op.ProcessOutput(reader);
+ }
+ }
+
+ bool QueryCursorImpl::CreateIteratorIfNeeded(GridError* err)
+ {
+ if (!iterCalled)
+ {
+ JniErrorInfo jniErr;
+
+ env.Get()->Context()->QueryCursorIterator(javaRef, &jniErr);
+
+ GridError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+ if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+ {
+ iterCalled = true;
+
+ hasNext = IteratorHasNext(err);
+ }
+ else
+ return false;
+ }
+
+ return true;
+ }
+
+ bool QueryCursorImpl::IteratorHasNext(GridError* err)
+ {
+ JniErrorInfo jniErr;
+
+ bool res = env.Get()->Context()->QueryCursorIteratorHasNext(javaRef, &jniErr);
+
+ GridError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+ return jniErr.code == IGNITE_JNI_ERR_SUCCESS && res;
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/grid_environment.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/grid_environment.cpp b/modules/platform/src/main/cpp/core/src/impl/grid_environment.cpp
new file mode 100644
index 0000000..f0e243b
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/grid_environment.cpp
@@ -0,0 +1,158 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include "gridgain/impl/portable/portable_reader_impl.h"
+#include "gridgain/impl/grid_environment.h"
+#include "gridgain/portable/portable.h"
+
+using namespace ignite::common::concurrent;
+using namespace ignite::common::java;
+using namespace gridgain::impl::interop;
+using namespace gridgain::impl::portable;
+using namespace gridgain::portable;
+
+namespace gridgain
+{
+ namespace impl
+ {
+ /**
+ * OnStart callback.
+ *
+ * @param target Target environment.
+ * @param memPtr Memory pointer.
+ */
+ void IGNITE_CALL OnStart(void* target, long long memPtr)
+ {
+ SharedPointer<GridEnvironment>* ptr = static_cast<SharedPointer<GridEnvironment>*>(target);
+
+ ptr->Get()->OnStartCallback(memPtr);
+ }
+
+ /**
+ * OnStop callback.
+ *
+ * @param target Target environment.
+ */
+ void IGNITE_CALL OnStop(void* target)
+ {
+ SharedPointer<GridEnvironment>* ptr = static_cast<SharedPointer<GridEnvironment>*>(target);
+
+ delete ptr;
+ }
+
+ GridEnvironment::GridEnvironment() : ctx(SharedPointer<JniContext>()), latch(new SingleLatch), gridName(NULL),
+ metaMgr(new PortableMetadataManager())
+ {
+ // No-op.
+ }
+
+ GridEnvironment::~GridEnvironment()
+ {
+ delete latch;
+
+ if (gridName)
+ delete gridName;
+
+ delete metaMgr;
+ }
+
+ JniHandlers GridEnvironment::GetJniHandlers(SharedPointer<GridEnvironment>* target)
+ {
+ JniHandlers hnds = JniHandlers();
+
+ hnds.target = target;
+
+ hnds.onStart = OnStart;
+ hnds.onStop = OnStop;
+
+ hnds.error = NULL;
+
+ return hnds;
+ }
+
+ void GridEnvironment::Initialize(SharedPointer<JniContext> ctx)
+ {
+ this->ctx = ctx;
+
+ latch->CountDown();
+ }
+
+ char* GridEnvironment::GridName()
+ {
+ return gridName;
+ }
+
+ JniContext* GridEnvironment::Context()
+ {
+ return ctx.Get();
+ }
+
+ SharedPointer<InteropMemory> GridEnvironment::AllocateMemory()
+ {
+ SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(1024));
+
+ return ptr;
+ }
+
+ SharedPointer<InteropMemory> GridEnvironment::AllocateMemory(int32_t cap)
+ {
+ SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(cap));
+
+ return ptr;
+ }
+
+ SharedPointer<InteropMemory> GridEnvironment::GetMemory(int64_t memPtr)
+ {
+ int8_t* memPtr0 = reinterpret_cast<int8_t*>(memPtr);
+
+ int32_t flags = InteropMemory::Flags(memPtr0);
+
+ if (InteropMemory::IsExternal(flags))
+ {
+ SharedPointer<InteropMemory> ptr(new InteropExternalMemory(memPtr0));
+
+ return ptr;
+ }
+ else
+ {
+ SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(memPtr0));
+
+ return ptr;
+ }
+ }
+
+ PortableMetadataManager* GridEnvironment::GetMetadataManager()
+ {
+ return metaMgr;
+ }
+
+ void GridEnvironment::OnStartCallback(long long memPtr)
+ {
+ InteropExternalMemory mem(reinterpret_cast<int8_t*>(memPtr));
+ InteropInputStream stream(&mem);
+
+ PortableReaderImpl reader(&stream);
+
+ int32_t gridNameLen = reader.ReadString(NULL, 0);
+
+ if (gridNameLen >= 0)
+ {
+ gridName = new char[gridNameLen + 1];
+ reader.ReadString(gridName, gridNameLen + 1);
+ }
+ else
+ gridName = NULL;
+ }
+ }
+}
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/grid_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/grid_impl.cpp b/modules/platform/src/main/cpp/core/src/impl/grid_impl.cpp
new file mode 100644
index 0000000..43222e1
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/grid_impl.cpp
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include "gridgain/impl/grid_impl.h"
+
+using namespace ignite::common::concurrent;
+using namespace ignite::common::java;
+
+namespace gridgain
+{
+ namespace impl
+ {
+ GridImpl::GridImpl(SharedPointer<GridEnvironment> env, jobject javaRef) : env(env), javaRef(javaRef)
+ {
+ // No-op.
+ }
+
+ GridImpl::~GridImpl()
+ {
+ JniContext::Release(javaRef);
+ }
+
+ char* GridImpl::GetName()
+ {
+ return env.Get()->GridName();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp b/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp
new file mode 100644
index 0000000..a4eafc0
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp
@@ -0,0 +1,226 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include "gridgain/impl/handle_registry.h"
+
+using namespace ignite::common::concurrent;
+
+namespace gridgain
+{
+ namespace impl
+ {
+ HandleRegistryEntry::~HandleRegistryEntry()
+ {
+ // No-op.
+ }
+
+ HandleRegistrySegment::HandleRegistrySegment() :
+ map(new std::map<int64_t, SharedPointer<HandleRegistryEntry>>()), mux(new CriticalSection())
+ {
+ // No-op.
+ }
+
+ HandleRegistrySegment::~HandleRegistrySegment()
+ {
+ delete map;
+ delete mux;
+ }
+
+ SharedPointer<HandleRegistryEntry> HandleRegistrySegment::Get(int64_t hnd)
+ {
+ mux->Enter();
+
+ SharedPointer<HandleRegistryEntry> res = (*map)[hnd];
+
+ mux->Leave();
+
+ return res;
+ }
+
+ void HandleRegistrySegment::Put(int64_t hnd, const SharedPointer<HandleRegistryEntry>& entry)
+ {
+ mux->Enter();
+
+ (*map)[hnd] = entry;
+
+ mux->Leave();
+ }
+
+ void HandleRegistrySegment::Remove(int64_t hnd)
+ {
+ mux->Enter();
+
+ map->erase(hnd);
+
+ mux->Leave();
+ }
+
+ void HandleRegistrySegment::Clear()
+ {
+ mux->Enter();
+
+ map->erase(map->begin(), map->end());
+
+ mux->Leave();
+ }
+
+ HandleRegistry::HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt)
+ {
+ this->fastCap = fastCap;
+
+ fastCtr = 0;
+
+ fast = new SharedPointer<HandleRegistryEntry>[fastCap];
+
+ for (int i = 0; i < fastCap; i++)
+ fast[i] = SharedPointer<HandleRegistryEntry>();
+
+ this->slowSegmentCnt = slowSegmentCnt;
+
+ slowCtr = fastCap;
+
+ slow = new HandleRegistrySegment*[slowSegmentCnt];
+
+ for (int i = 0; i < slowSegmentCnt; i++)
+ slow[i] = new HandleRegistrySegment();
+
+ closed = 0;
+
+ Memory::Fence();
+ }
+
+ HandleRegistry::~HandleRegistry()
+ {
+ Close();
+
+ delete[] fast;
+
+ for (int i = 0; i < slowSegmentCnt; i++)
+ delete slow[i];
+
+ delete[] slow;
+ }
+
+ int64_t HandleRegistry::Allocate(const SharedPointer<HandleRegistryEntry>& target)
+ {
+ return Allocate0(target, false, false);
+ }
+
+ int64_t HandleRegistry::AllocateCritical(const SharedPointer<HandleRegistryEntry>& target)
+ {
+ return Allocate0(target, true, false);
+ }
+
+ int64_t HandleRegistry::AllocateSafe(const SharedPointer<HandleRegistryEntry>& target)
+ {
+ return Allocate0(target, false, true);
+ }
+
+ int64_t HandleRegistry::AllocateCriticalSafe(const SharedPointer<HandleRegistryEntry>& target)
+ {
+ return Allocate0(target, true, true);
+ }
+
+ void HandleRegistry::Release(int64_t hnd)
+ {
+ if (hnd < fastCap)
+ fast[static_cast<int32_t>(hnd)] = SharedPointer<HandleRegistryEntry>();
+ else
+ {
+ HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt);
+
+ segment->Remove(hnd);
+ }
+
+ Memory::Fence();
+ }
+
+ SharedPointer<HandleRegistryEntry> HandleRegistry::Get(int64_t hnd)
+ {
+ Memory::Fence();
+
+ if (hnd < fastCap)
+ return fast[static_cast<int32_t>(hnd)];
+ else
+ {
+ HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt);
+
+ return segment->Get(hnd);
+ }
+ }
+
+ void HandleRegistry::Close()
+ {
+ if (Atomics::CompareAndSet32(&closed, 0, 1))
+ {
+ // Cleanup fast-path handles.
+ for (int i = 0; i < fastCap; i++)
+ fast[i] = SharedPointer<HandleRegistryEntry>();
+
+ // Cleanup slow-path handles.
+ for (int i = 0; i < slowSegmentCnt; i++)
+ (*(slow + i))->Clear();
+ }
+ }
+
+ int64_t HandleRegistry::Allocate0(const SharedPointer<HandleRegistryEntry>& target, bool critical, bool safe)
+ {
+ // Check closed state.
+ Memory::Fence();
+
+ if (closed == 1)
+ return -1;
+
+ // Try allocating entry on critical path.
+ if (critical)
+ {
+ if (fastCtr < fastCap)
+ {
+ int32_t fastIdx = Atomics::IncrementAndGet32(&fastCtr) - 1;
+
+ if (fastIdx < fastCap)
+ {
+ fast[fastIdx] = target;
+
+ // Double-check for closed state if safe mode is on.
+ Memory::Fence();
+
+ if (safe && closed == 1)
+ {
+ fast[fastIdx] = SharedPointer<HandleRegistryEntry>();
+
+ return -1;
+ }
+ else
+ return fastIdx;
+ }
+ }
+ }
+
+ // Either allocating on slow-path, or fast-path can no longer accomodate more entries.
+ int64_t slowIdx = Atomics::IncrementAndGet64(&slowCtr) - 1;
+
+ HandleRegistrySegment* segment = *(slow + slowIdx % slowSegmentCnt);
+
+ segment->Put(slowIdx, target);
+
+ // Double-check for closed state if safe mode is on.
+ Memory::Fence();
+
+ if (safe && closed == 1)
+ {
+ segment->Remove(slowIdx);
+
+ return -1;
+ }
+
+ return slowIdx;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp b/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp
new file mode 100644
index 0000000..c166847
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp
@@ -0,0 +1,207 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include <cstring>
+
+#include "gridgain/impl/interop/interop_input_stream.h"
+#include "gridgain/grid_error.h"
+
+/**
+ * Common macro to read a single value.
+ */
+#define GG_INTEROP_IN_READ(type, len) { \
+ EnsureEnoughData(len); \
+ type res = *reinterpret_cast<type*>(data + pos); \
+ Shift(len); \
+ return res; \
+}
+
+/**
+ * Common macro to read an array.
+ */
+#define GG_INTEROP_IN_READ_ARRAY(len, shift) { \
+ CopyAndShift(reinterpret_cast<int8_t*>(res), 0, len << shift); \
+}
+
+namespace gridgain
+{
+ namespace impl
+ {
+ namespace interop
+ {
+ union PortableInt32Float
+ {
+ int32_t i;
+ float f;
+ };
+
+ union PortableInt64Double
+ {
+ int64_t i;
+ double d;
+ };
+
+ InteropInputStream::InteropInputStream(InteropMemory* mem)
+ {
+ this->mem = mem;
+
+ data = mem->Data();
+ len = mem->Length();
+ pos = 0;
+ }
+
+ int8_t InteropInputStream::ReadInt8()
+ {
+ GG_INTEROP_IN_READ(int8_t, 1);
+ }
+
+ void InteropInputStream::ReadInt8Array(int8_t* const res, const int32_t len)
+ {
+ GG_INTEROP_IN_READ_ARRAY(len, 0);
+ }
+
+ bool InteropInputStream::ReadBool()
+ {
+ return ReadInt8() == 1;
+ }
+
+ void InteropInputStream::ReadBoolArray(bool* const res, const int32_t len)
+ {
+ for (int i = 0; i < len; i++)
+ *(res + i) = ReadBool();
+ }
+
+ int16_t InteropInputStream::ReadInt16()
+ {
+ GG_INTEROP_IN_READ(int16_t, 2);
+ }
+
+ void InteropInputStream::ReadInt16Array(int16_t* const res, const int32_t len)
+ {
+ GG_INTEROP_IN_READ_ARRAY(len, 1);
+ }
+
+ uint16_t InteropInputStream::ReadUInt16()
+ {
+ GG_INTEROP_IN_READ(uint16_t, 2);
+ }
+
+ void InteropInputStream::ReadUInt16Array(uint16_t* const res, const int32_t len)
+ {
+ GG_INTEROP_IN_READ_ARRAY(len, 1);
+ }
+
+ int32_t InteropInputStream::ReadInt32()
+ {
+ GG_INTEROP_IN_READ(int32_t, 4);
+ }
+
+ int32_t InteropInputStream::ReadInt32(int32_t pos)
+ {
+ int delta = pos + 4 - this->pos;
+
+ if (delta > 0)
+ EnsureEnoughData(delta);
+
+ return *reinterpret_cast<int32_t*>(data + pos);
+ }
+
+ void InteropInputStream::ReadInt32Array(int32_t* const res, const int32_t len)
+ {
+ GG_INTEROP_IN_READ_ARRAY(len, 2);
+ }
+
+ int64_t InteropInputStream::ReadInt64()
+ {
+ GG_INTEROP_IN_READ(int64_t, 8);
+ }
+
+ void InteropInputStream::ReadInt64Array(int64_t* const res, const int32_t len)
+ {
+ GG_INTEROP_IN_READ_ARRAY(len, 3);
+ }
+
+ float InteropInputStream::ReadFloat()
+ {
+ PortableInt32Float u;
+
+ u.i = ReadInt32();
+
+ return u.f;
+ }
+
+ void InteropInputStream::ReadFloatArray(float* const res, const int32_t len)
+ {
+ GG_INTEROP_IN_READ_ARRAY(len, 2);
+ }
+
+ double InteropInputStream::ReadDouble()
+ {
+ PortableInt64Double u;
+
+ u.i = ReadInt64();
+
+ return u.d;
+ }
+
+ void InteropInputStream::ReadDoubleArray(double* const res, const int32_t len)
+ {
+ GG_INTEROP_IN_READ_ARRAY(len, 3);
+ }
+
+ int32_t InteropInputStream::Remaining()
+ {
+ return len - pos;
+ }
+
+ int32_t InteropInputStream::Position()
+ {
+ return pos;
+ }
+
+ void InteropInputStream::Position(int32_t pos)
+ {
+ if (pos > len) {
+ GG_ERROR_FORMATTED_3(GridError::GG_ERR_MEMORY, "Requested input stream position is out of bounds",
+ "memPtr", mem->PointerLong(), "len", len, "pos", pos);
+ }
+
+ this->pos = pos;
+ }
+
+ void InteropInputStream::Synchronize()
+ {
+ data = mem->Data();
+ len = mem->Length();
+ }
+
+ void InteropInputStream::EnsureEnoughData(int32_t cnt)
+ {
+ if (len - pos < cnt) {
+ GG_ERROR_FORMATTED_4(GridError::GG_ERR_MEMORY, "Not enough data in the stream",
+ "memPtr", mem->PointerLong(), "len", len, "pos", pos, "requested", cnt);
+ }
+ }
+
+ void InteropInputStream::CopyAndShift(int8_t* dest, int32_t off, int32_t cnt)
+ {
+ EnsureEnoughData(cnt);
+
+ memcpy(dest + off, data + pos, cnt);
+
+ Shift(cnt);
+ }
+
+ void InteropInputStream::Shift(int32_t cnt)
+ {
+ pos += cnt;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp b/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp
new file mode 100644
index 0000000..0402894
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp
@@ -0,0 +1,173 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+#include <ignite/common/java.h>
+
+#include "gridgain/impl/interop/interop_memory.h"
+#include "gridgain/grid_error.h"
+
+using namespace ignite::common::java;
+
+namespace gridgain
+{
+ namespace impl
+ {
+ namespace interop
+ {
+ int8_t* InteropMemory::Data(int8_t* memPtr)
+ {
+ return reinterpret_cast<int8_t*>(*reinterpret_cast<int64_t*>(memPtr));
+ }
+
+ void InteropMemory::Data(int8_t* memPtr, void* ptr)
+ {
+ *reinterpret_cast<int64_t*>(memPtr) = reinterpret_cast<int64_t>(ptr);
+ }
+
+ int32_t InteropMemory::Capacity(int8_t* memPtr)
+ {
+ return *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_CAP);
+ }
+
+ void InteropMemory::Capacity(int8_t* memPtr, int32_t val)
+ {
+ *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_CAP) = val;
+ }
+
+ int32_t InteropMemory::Length(int8_t* memPtr)
+ {
+ return *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_LEN);
+ }
+
+ void InteropMemory::Length(int8_t* memPtr, int32_t val)
+ {
+ *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_LEN) = val;
+ }
+
+ int32_t InteropMemory::Flags(int8_t* memPtr)
+ {
+ return *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_FLAGS);
+ }
+
+ void InteropMemory::Flags(int8_t* memPtr, int32_t val)
+ {
+ *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_FLAGS) = val;
+ }
+
+ bool InteropMemory::IsExternal(int8_t* memPtr)
+ {
+ return IsExternal(Flags(memPtr));
+ }
+
+ bool InteropMemory::IsExternal(int32_t flags)
+ {
+ return (flags & GG_MEM_FLAG_EXT) != GG_MEM_FLAG_EXT;
+ }
+
+ bool InteropMemory::IsPooled(int8_t* memPtr)
+ {
+ return IsPooled(Flags(memPtr));
+ }
+
+ bool InteropMemory::IsPooled(int32_t flags)
+ {
+ return (flags & GG_MEM_FLAG_POOLED) != 0;
+ }
+
+ bool InteropMemory::IsAcquired(int8_t* memPtr)
+ {
+ return IsAcquired(Flags(memPtr));
+ }
+
+ bool InteropMemory::IsAcquired(int32_t flags)
+ {
+ return (flags & GG_MEM_FLAG_ACQUIRED) != 0;
+ }
+
+ int8_t* InteropMemory::Pointer()
+ {
+ return memPtr;
+ }
+
+ int64_t InteropMemory::PointerLong()
+ {
+ return reinterpret_cast<int64_t>(memPtr);
+ }
+
+ int8_t* InteropMemory::Data()
+ {
+ return Data(memPtr);
+ }
+
+ int32_t InteropMemory::Capacity()
+ {
+ return Capacity(memPtr);
+ }
+
+ int32_t InteropMemory::Length()
+ {
+ return Length(memPtr);
+ }
+
+ void InteropMemory::Length(int32_t val)
+ {
+ Length(memPtr, val);
+ }
+
+ InteropUnpooledMemory::InteropUnpooledMemory(int32_t cap)
+ {
+ memPtr = static_cast<int8_t*>(malloc(GG_MEM_HDR_LEN));
+
+ Data(memPtr, malloc(cap));
+ Capacity(memPtr, cap);
+ Length(memPtr, 0);
+ Flags(memPtr, GG_MEM_FLAG_EXT);
+
+ owning = true;
+ }
+
+ InteropUnpooledMemory::InteropUnpooledMemory(int8_t* memPtr)
+ {
+ this->memPtr = memPtr;
+ this->owning = false;
+ }
+
+ InteropUnpooledMemory::~InteropUnpooledMemory()
+ {
+ if (owning) {
+ free(Data());
+ free(memPtr);
+ }
+ }
+
+ void InteropUnpooledMemory::Reallocate(int32_t cap)
+ {
+ int doubledCap = Capacity() << 1;
+
+ if (doubledCap > cap)
+ cap = doubledCap;
+
+ Data(memPtr, realloc(Data(memPtr), cap));
+ Capacity(memPtr, cap);
+ }
+
+ InteropExternalMemory::InteropExternalMemory(int8_t* memPtr)
+ {
+ this->memPtr = memPtr;
+ }
+
+ void InteropExternalMemory::Reallocate(int32_t cap)
+ {
+ if (JniContext::Reallocate(reinterpret_cast<int64_t>(memPtr), cap) == -1) {
+ GG_ERROR_FORMATTED_2(GridError::GG_ERR_MEMORY, "Failed to reallocate external memory",
+ "memPtr", PointerLong(), "requestedCapacity", cap)
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp b/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp
new file mode 100644
index 0000000..351f851
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp
@@ -0,0 +1,207 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include <cstring>
+
+#include "gridgain/impl/interop/interop_output_stream.h"
+#include "gridgain/grid_error.h"
+
+/**
+ * Common macro to write a single value.
+ */
+#define GG_INTEROP_OUT_WRITE(val, type, len) { \
+ EnsureCapacity(pos + len); \
+ *reinterpret_cast<type*>(data + pos) = val; \
+ Shift(len); \
+}
+
+/**
+ * Common macro to write an array.
+ */
+#define GG_INTEROP_OUT_WRITE_ARRAY(val, len) { \
+ CopyAndShift(reinterpret_cast<const int8_t*>(val), 0, len); \
+}
+
+namespace gridgain
+{
+ namespace impl
+ {
+ namespace interop
+ {
+ union PortableFloatInt32
+ {
+ float f;
+ int32_t i;
+ };
+
+ union PortableDoubleInt64
+ {
+ double d;
+ int64_t i;
+ };
+
+ InteropOutputStream::InteropOutputStream(InteropMemory* mem)
+ {
+ this->mem = mem;
+
+ data = mem->Data();
+ cap = mem->Capacity();
+ pos = 0;
+ }
+
+ void InteropOutputStream::WriteInt8(const int8_t val)
+ {
+ GG_INTEROP_OUT_WRITE(val, int8_t, 1);
+ }
+
+ void InteropOutputStream::WriteInt8(const int8_t val, const int32_t pos)
+ {
+ EnsureCapacity(pos + 1);
+
+ *(data + pos) = val;
+ }
+
+ void InteropOutputStream::WriteInt8Array(const int8_t* val, const int32_t len)
+ {
+ GG_INTEROP_OUT_WRITE_ARRAY(val, len);
+ }
+
+ void InteropOutputStream::WriteBool(const bool val)
+ {
+ WriteInt8(val ? 1 : 0);
+ }
+
+ void InteropOutputStream::WriteBoolArray(const bool* val, const int32_t len)
+ {
+ for (int i = 0; i < len; i++)
+ WriteBool(*(val + i));
+ }
+
+ void InteropOutputStream::WriteInt16(const int16_t val)
+ {
+ GG_INTEROP_OUT_WRITE(val, int16_t, 2);
+ }
+
+ void InteropOutputStream::WriteInt16Array(const int16_t* val, const int32_t len)
+ {
+ GG_INTEROP_OUT_WRITE_ARRAY(val, len << 1);
+ }
+
+ void InteropOutputStream::WriteUInt16(const uint16_t val)
+ {
+ GG_INTEROP_OUT_WRITE(val, uint16_t, 2);
+ }
+
+ void InteropOutputStream::WriteUInt16Array(const uint16_t* val, const int32_t len)
+ {
+ GG_INTEROP_OUT_WRITE_ARRAY(val, len << 1);
+ }
+
+ void InteropOutputStream::WriteInt32(const int32_t val)
+ {
+ GG_INTEROP_OUT_WRITE(val, int32_t, 4);
+ }
+
+ void InteropOutputStream::WriteInt32(const int32_t pos, const int32_t val)
+ {
+ EnsureCapacity(pos + 4);
+
+ *reinterpret_cast<int32_t*>(data + pos) = val;
+ }
+
+ void InteropOutputStream::WriteInt32Array(const int32_t* val, const int32_t len)
+ {
+ GG_INTEROP_OUT_WRITE_ARRAY(val, len << 2);
+ }
+
+ void InteropOutputStream::WriteInt64(const int64_t val)
+ {
+ GG_INTEROP_OUT_WRITE(val, int64_t, 8);
+ }
+
+ void InteropOutputStream::WriteInt64Array(const int64_t* val, const int32_t len)
+ {
+ GG_INTEROP_OUT_WRITE_ARRAY(val, len << 3);
+ }
+
+ void InteropOutputStream::WriteFloat(const float val)
+ {
+ PortableFloatInt32 u;
+
+ u.f = val;
+
+ WriteInt32(u.i);
+ }
+
+ void InteropOutputStream::WriteFloatArray(const float* val, const int32_t len)
+ {
+ for (int i = 0; i < len; i++)
+ WriteFloat(*(val + i));
+ }
+
+ void InteropOutputStream::WriteDouble(const double val)
+ {
+ PortableDoubleInt64 u;
+
+ u.d = val;
+
+ WriteInt64(u.i);
+ }
+
+ void InteropOutputStream::WriteDoubleArray(const double* val, const int32_t len)
+ {
+ for (int i = 0; i < len; i++)
+ WriteDouble(*(val + i));
+ }
+
+ int32_t InteropOutputStream::Position()
+ {
+ return pos;
+ }
+
+ void InteropOutputStream::Position(const int32_t val)
+ {
+ EnsureCapacity(val);
+
+ pos = val;
+ }
+
+ void InteropOutputStream::Synchronize()
+ {
+ mem->Length(pos);
+ }
+
+ void InteropOutputStream::EnsureCapacity(int32_t reqCap) {
+ if (reqCap > cap) {
+ int newCap = cap << 1;
+
+ if (newCap < reqCap)
+ newCap = reqCap;
+
+ mem->Reallocate(newCap);
+ data = mem->Data();
+ cap = newCap;
+ }
+ }
+
+ void InteropOutputStream::Shift(int32_t cnt) {
+ pos += cnt;
+ }
+
+ void InteropOutputStream::CopyAndShift(const int8_t* src, int32_t off, int32_t len) {
+ EnsureCapacity(pos + len);
+
+ memcpy(data + pos, src + off, len);
+
+ Shift(len);
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp
new file mode 100644
index 0000000..08dc65d
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include "gridgain/impl/portable/portable_metadata_handler.h"
+
+using namespace ignite::common::concurrent;
+
+namespace gridgain
+{
+ namespace impl
+ {
+ namespace portable
+ {
+ PortableMetadataHandler::PortableMetadataHandler(SPSnap snap) : snap(snap), fieldIds(NULL), fields(NULL)
+ {
+ // No-op.
+ }
+
+ PortableMetadataHandler::~PortableMetadataHandler()
+ {
+ if (fieldIds)
+ delete fieldIds;
+
+ if (fields)
+ delete fields;
+ }
+
+ void PortableMetadataHandler::OnFieldWritten(int32_t fieldId, std::string fieldName, int32_t fieldTypeId)
+ {
+ if (!snap.Get() || !snap.Get()->ContainsFieldId(fieldId))
+ {
+ if (!HasDifference())
+ {
+ fieldIds = new std::set<int32_t>();
+ fields = new std::map<std::string, int32_t>();
+ }
+
+ fieldIds->insert(fieldId);
+ (*fields)[fieldName] = fieldTypeId;
+ }
+ }
+
+ SPSnap PortableMetadataHandler::GetSnapshot()
+ {
+ return snap;
+ }
+
+ bool PortableMetadataHandler::HasDifference()
+ {
+ return fieldIds ? true : false;
+ }
+
+ std::set<int32_t>* PortableMetadataHandler::GetFieldIds()
+ {
+ return fieldIds;
+ }
+
+ std::map<std::string, int32_t>* PortableMetadataHandler::GetFields()
+ {
+ return fields;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp
new file mode 100644
index 0000000..d1076a8
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp
@@ -0,0 +1,193 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include <ignite/common/concurrent.h>
+
+#include "gridgain/impl/portable/portable_metadata_manager.h"
+
+using namespace ignite::common::concurrent;
+
+namespace gridgain
+{
+ namespace impl
+ {
+ namespace portable
+ {
+ PortableMetadataManager::PortableMetadataManager() :
+ snapshots(SharedPointer<std::map<int32_t, SPSnap>>(new std::map<int32_t, SPSnap>)),
+ pending(new std::vector<SPSnap>()),
+ cs(new CriticalSection()),
+ pendingVer(0), ver(0)
+ {
+ // No-op.
+ }
+
+ PortableMetadataManager::~PortableMetadataManager()
+ {
+ pending->erase(pending->begin(), pending->end());
+
+ delete pending;
+ delete cs;
+ }
+
+ SharedPointer<PortableMetadataHandler> PortableMetadataManager::GetHandler(int32_t typeId)
+ {
+ SharedPointer<std::map<int32_t, SPSnap>> snapshots0 = snapshots;
+
+ SPSnap snapshot = (*snapshots0.Get())[typeId];
+
+ return SharedPointer<PortableMetadataHandler>(new PortableMetadataHandler(snapshot));
+ }
+
+ void PortableMetadataManager::SubmitHandler(std::string typeName, int32_t typeId,
+ PortableMetadataHandler* hnd)
+ {
+ Snap* snap = hnd->GetSnapshot().Get();
+
+ // If this is the very first write of a class or difference exists,
+ // we need to enqueue it for write.
+ if (!snap || hnd->HasDifference())
+ {
+ std::set<int32_t>* newFieldIds = new std::set<int32_t>();
+ std::map<std::string, int32_t>* newFields = new std::map<std::string, int32_t>();
+
+ CopyFields(snap, newFieldIds, newFields);
+
+ if (hnd->HasDifference())
+ {
+ std::set<int32_t>* diffFieldIds = hnd->GetFieldIds();
+ std::map<std::string, int32_t>* diffFields = hnd->GetFields();
+
+ for (std::set<int32_t>::iterator it = diffFieldIds->begin(); it != diffFieldIds->end(); ++it)
+ newFieldIds->insert(*it);
+
+ for (std::map<std::string, int32_t>::iterator it = diffFields->begin(); it != diffFields->end(); ++it)
+ (*newFields)[it->first] = it->second;
+ }
+
+ Snap* diffSnap = new Snap(typeName, typeId, newFieldIds, newFields);
+
+ cs->Enter();
+
+ pending->push_back(SPSnap(diffSnap));
+
+ pendingVer++;
+
+ cs->Leave();
+ }
+ }
+
+ int32_t PortableMetadataManager::GetVersion()
+ {
+ Memory::Fence();
+
+ return ver;
+ }
+
+ bool PortableMetadataManager::IsUpdatedSince(int32_t oldVer)
+ {
+ Memory::Fence();
+
+ return pendingVer > oldVer;
+ }
+
+ bool PortableMetadataManager::ProcessPendingUpdates(PortableMetadataUpdater* updater, GridError* err)
+ {
+ bool success = true; // Optimistically assume that all will be fine.
+
+ cs->Enter();
+
+ for (std::vector<SPSnap>::iterator it = pending->begin(); it != pending->end(); ++it)
+ {
+ Snap* pendingSnap = (*it).Get();
+
+ if (updater->Update(pendingSnap, err))
+ {
+ // Perform copy-on-write update of snapshot collection.
+ std::map<int32_t, SPSnap>* newSnapshots = new std::map<int32_t, SPSnap>();
+
+ bool snapshotFound = false;
+
+ for (std::map<int32_t, SPSnap>::iterator snapIt = snapshots.Get()->begin();
+ snapIt != snapshots.Get()->end(); ++snapIt)
+ {
+ int32_t curTypeId = snapIt->first;
+ Snap* curSnap = snapIt->second.Get();
+
+ if (pendingSnap->GetTypeId() == curTypeId)
+ {
+ // Have to create snapshot with updated fields.
+ std::set<int32_t>* newFieldIds = new std::set<int32_t>();
+ std::map<std::string, int32_t>* newFields = new std::map<std::string, int32_t>();
+
+ // Add old fields.
+ CopyFields(curSnap, newFieldIds, newFields);
+
+ // Add new fields.
+ CopyFields(pendingSnap, newFieldIds, newFields);
+
+ // Create new snapshot.
+ Snap* newSnap = new Snap(pendingSnap->GetTypeName(), pendingSnap->GetTypeId(),
+ newFieldIds, newFields);
+
+ (*newSnapshots)[curTypeId] = SPSnap(newSnap);
+
+ snapshotFound = true;
+ }
+ else
+ (*newSnapshots)[curTypeId] = snapIt->second; // Just transfer exising snapshot.
+ }
+
+ // Handle situation when completely new snapshot is found.
+ if (!snapshotFound)
+ (*newSnapshots)[pendingSnap->GetTypeId()] = *it;
+
+ snapshots = SharedPointer<std::map<int32_t, SPSnap>>(newSnapshots);
+ }
+ else
+ {
+ // Stop as we cannot move further.
+ success = false;
+
+ break;
+ }
+ }
+
+ if (success)
+ {
+ pending->erase(pending->begin(), pending->end());
+
+ ver = pendingVer;
+ }
+
+ cs->Leave();
+
+ return success;
+ }
+
+ void PortableMetadataManager::CopyFields(Snap* snap, std::set<int32_t>* fieldIds,
+ std::map<std::string, int32_t>* fields)
+ {
+ if (snap && snap->HasFields())
+ {
+ std::set<int32_t>* snapFieldIds = snap->GetFieldIds();
+ std::map<std::string, int32_t>* snapFields = snap->GetFields();
+
+ for (std::set<int32_t>::iterator oldIt = snapFieldIds->begin();
+ oldIt != snapFieldIds->end(); ++oldIt)
+ fieldIds->insert(*oldIt);
+
+ for (std::map<std::string, int32_t>::iterator newFieldsIt = snapFields->begin();
+ newFieldsIt != snapFields->end(); ++newFieldsIt)
+ (*fields)[newFieldsIt->first] = newFieldsIt->second;
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp
new file mode 100644
index 0000000..18cc2ce
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include "gridgain/impl/portable/portable_metadata_snapshot.h"
+
+namespace gridgain
+{
+ namespace impl
+ {
+ namespace portable
+ {
+ PortableMetadataSnapshot::PortableMetadataSnapshot(std::string typeName, int32_t typeId,
+ std::set<int32_t>* fieldIds, std::map<std::string, int32_t>* fields) :
+ typeName(typeName), typeId(typeId), fieldIds(fieldIds), fields(fields)
+ {
+ // No-op.
+ }
+
+ PortableMetadataSnapshot::~PortableMetadataSnapshot()
+ {
+ delete fieldIds;
+ delete fields;
+ }
+
+ bool PortableMetadataSnapshot::ContainsFieldId(int32_t fieldId)
+ {
+ return fieldIds && fieldIds->count(fieldId) == 1;
+ }
+
+ std::string PortableMetadataSnapshot::GetTypeName()
+ {
+ return typeName;
+ }
+
+ int32_t PortableMetadataSnapshot::GetTypeId()
+ {
+ return typeId;
+ }
+
+ bool PortableMetadataSnapshot::HasFields()
+ {
+ return !fieldIds->empty();
+ }
+
+ std::set<int32_t>* PortableMetadataSnapshot::GetFieldIds()
+ {
+ return fieldIds;
+ }
+
+ std::map<std::string, int32_t>* PortableMetadataSnapshot::GetFields()
+ {
+ return fields;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp
new file mode 100644
index 0000000..3022b72
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp
@@ -0,0 +1,24 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include "gridgain/impl/portable/portable_metadata_updater.h"
+
+namespace gridgain
+{
+ namespace impl
+ {
+ namespace portable
+ {
+ PortableMetadataUpdater::~PortableMetadataUpdater()
+ {
+ // No-op.
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp
new file mode 100644
index 0000000..111ccfd
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp
@@ -0,0 +1,86 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include "gridgain/impl/portable/portable_metadata_updater_impl.h"
+#include <gridgain/impl/interop/interop_output_stream.h>
+#include <gridgain/impl/portable/portable_writer_impl.h>
+#include <gridgain/portable/portable_raw_writer.h>
+
+using namespace ignite::common::concurrent;
+using namespace ignite::common::java;
+using namespace gridgain::impl;
+using namespace gridgain::impl::interop;
+using namespace gridgain::portable;
+
+namespace gridgain
+{
+ namespace impl
+ {
+ namespace portable
+ {
+ /** Operation: Clear. */
+ const int32_t OP_METADATA = -1;
+
+ PortableMetadataUpdaterImpl::PortableMetadataUpdaterImpl(SharedPointer<GridEnvironment> env,
+ jobject javaRef) : env(env), javaRef(javaRef)
+ {
+ // No-op.
+ }
+
+ PortableMetadataUpdaterImpl::~PortableMetadataUpdaterImpl()
+ {
+ // No-op.
+ }
+
+ bool PortableMetadataUpdaterImpl::Update(Snap* snap, GridError* err)
+ {
+ JniErrorInfo jniErr;
+
+ SharedPointer<InteropMemory> mem = env.Get()->AllocateMemory();
+
+ InteropOutputStream out(mem.Get());
+ PortableWriterImpl writer(&out, NULL);
+ PortableRawWriter rawWriter(&writer);
+
+ // We always pass only one meta at a time in current implementation for simplicity.
+ rawWriter.WriteInt32(1);
+
+ rawWriter.WriteInt32(snap->GetTypeId());
+ rawWriter.WriteString(snap->GetTypeName());
+ rawWriter.WriteString(NULL); // Affinity key is not supported for now.
+
+ if (snap->HasFields())
+ {
+ std::map<std::string, int32_t>* fields = snap->GetFields();
+
+ rawWriter.WriteInt32(static_cast<int32_t>(fields->size()));
+
+ for (std::map<std::string, int32_t>::iterator it = fields->begin(); it != fields->end(); ++it)
+ {
+ rawWriter.WriteString(it->first);
+ rawWriter.WriteInt32(it->second);
+ }
+ }
+ else
+ rawWriter.WriteInt32(0);
+
+ out.Synchronize();
+
+ long long res = env.Get()->Context()->TargetInStreamOutLong(javaRef, OP_METADATA, mem.Get()->PointerLong(), &jniErr);
+
+ GridError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+ if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+ return res == 1;
+ else
+ return false;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_reader_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_reader_impl.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_reader_impl.cpp
new file mode 100644
index 0000000..03df9e4
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_reader_impl.cpp
@@ -0,0 +1,675 @@
+/*
+ * Copyright (C) GridGain Systems. All Rights Reserved.
+ * _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+#include "gridgain/impl/interop/interop.h"
+#include "gridgain/impl/portable/portable_common.h"
+#include "gridgain/impl/portable/portable_id_resolver.h"
+#include "gridgain/impl/portable/portable_reader_impl.h"
+#include "gridgain/impl/portable/portable_utils.h"
+#include "gridgain/portable/portable_type.h"
+#include "gridgain/grid_error.h"
+
+using namespace gridgain::impl::interop;
+using namespace gridgain::impl::portable;
+using namespace gridgain::portable;
+
+namespace gridgain
+{
+ namespace impl
+ {
+ namespace portable
+ {
+ PortableReaderImpl::PortableReaderImpl(InteropInputStream* stream, PortableIdResolver* idRslvr,
+ int32_t pos, bool usrType, int32_t typeId, int32_t hashCode, int32_t len, int32_t rawOff) :
+ stream(stream), idRslvr(idRslvr), pos(pos), usrType(usrType), typeId(typeId),
+ hashCode(hashCode), len(len), rawOff(rawOff), rawMode(false),
+ elemIdGen(0), elemId(0), elemCnt(-1), elemRead(0)
+ {
+ // No-op.
+ }
+
+ PortableReaderImpl::PortableReaderImpl(InteropInputStream* stream) :
+ stream(stream), idRslvr(NULL), pos(0), usrType(false), typeId(0), hashCode(0),
+ len(0), rawOff(0), rawMode(true),
+ elemIdGen(0), elemId(0), elemCnt(-1), elemRead(0)
+ {
+ // No-op.
+ }
+
+ int8_t PortableReaderImpl::ReadInt8()
+ {
+ return ReadRaw<int8_t>(PortableUtils::ReadInt8);
+ }
+
+ int32_t PortableReaderImpl::ReadInt8Array(int8_t* res, const int32_t len)
+ {
+ return ReadRawArray<int8_t>(res, len, PortableUtils::ReadInt8Array, GG_TYPE_ARRAY_BYTE);
+ }
+
+ int8_t PortableReaderImpl::ReadInt8(const char* fieldName)
+ {
+ return Read(fieldName, PortableUtils::ReadInt8, GG_TYPE_BYTE, static_cast<int8_t>(0));
+ }
+
+ int32_t PortableReaderImpl::ReadInt8Array(const char* fieldName, int8_t* res, const int32_t len)
+ {
+ return ReadArray<int8_t>(fieldName, res, len,PortableUtils::ReadInt8Array, GG_TYPE_ARRAY_BYTE);
+ }
+
+ bool PortableReaderImpl::ReadBool()
+ {
+ return ReadRaw<bool>(PortableUtils::ReadBool);
+ }
+
+ int32_t PortableReaderImpl::ReadBoolArray(bool* res, const int32_t len)
+ {
+ return ReadRawArray<bool>(res, len, PortableUtils::ReadBoolArray, GG_TYPE_ARRAY_BOOL);
+ }
+
+ bool PortableReaderImpl::ReadBool(const char* fieldName)
+ {
+ return Read(fieldName, PortableUtils::ReadBool, GG_TYPE_BOOL, static_cast<bool>(0));
+ }
+
+ int32_t PortableReaderImpl::ReadBoolArray(const char* fieldName, bool* res, const int32_t len)
+ {
+ return ReadArray<bool>(fieldName, res, len,PortableUtils::ReadBoolArray, GG_TYPE_ARRAY_BOOL);
+ }
+
+ int16_t PortableReaderImpl::ReadInt16()
+ {
+ return ReadRaw<int16_t>(PortableUtils::ReadInt16);
+ }
+
+ int32_t PortableReaderImpl::ReadInt16Array(int16_t* res, const int32_t len)
+ {
+ return ReadRawArray<int16_t>(res, len, PortableUtils::ReadInt16Array, GG_TYPE_ARRAY_SHORT);
+ }
+
+ int16_t PortableReaderImpl::ReadInt16(const char* fieldName)
+ {
+ return Read(fieldName, PortableUtils::ReadInt16, GG_TYPE_SHORT, static_cast<int16_t>(0));
+ }
+
+ int32_t PortableReaderImpl::ReadInt16Array(const char* fieldName, int16_t* res, const int32_t len)
+ {
+ return ReadArray<int16_t>(fieldName, res, len, PortableUtils::ReadInt16Array, GG_TYPE_ARRAY_SHORT);
+ }
+
+ uint16_t PortableReaderImpl::ReadUInt16()
+ {
+ return ReadRaw<uint16_t>(PortableUtils::ReadUInt16);
+ }
+
+ int32_t PortableReaderImpl::ReadUInt16Array(uint16_t* res, const int32_t len)
+ {
+ return ReadRawArray<uint16_t>(res, len, PortableUtils::ReadUInt16Array, GG_TYPE_ARRAY_CHAR);
+ }
+
+ uint16_t PortableReaderImpl::ReadUInt16(const char* fieldName)
+ {
+ return Read(fieldName, PortableUtils::ReadUInt16, GG_TYPE_CHAR, static_cast<uint16_t>(0));
+ }
+
+ int32_t PortableReaderImpl::ReadUInt16Array(const char* fieldName, uint16_t* res, const int32_t len)
+ {
+ return ReadArray<uint16_t>(fieldName, res, len,PortableUtils::ReadUInt16Array, GG_TYPE_ARRAY_CHAR);
+ }
+
+ int32_t PortableReaderImpl::ReadInt32()
+ {
+ return ReadRaw<int32_t>(PortableUtils::ReadInt32);
+ }
+
+ int32_t PortableReaderImpl::ReadInt32Array(int32_t* res, const int32_t len)
+ {
+ return ReadRawArray<int32_t>(res, len, PortableUtils::ReadInt32Array, GG_TYPE_ARRAY_INT);
+ }
+
+ int32_t PortableReaderImpl::ReadInt32(const char* fieldName)
+ {
+ return Read(fieldName, PortableUtils::ReadInt32, GG_TYPE_INT, static_cast<int32_t>(0));
+ }
+
+ int32_t PortableReaderImpl::ReadInt32Array(const char* fieldName, int32_t* res, const int32_t len)
+ {
+ return ReadArray<int32_t>(fieldName, res, len,PortableUtils::ReadInt32Array, GG_TYPE_ARRAY_INT);
+ }
+
+ int64_t PortableReaderImpl::ReadInt64()
+ {
+ return ReadRaw<int64_t>(PortableUtils::ReadInt64);
+ }
+
+ int32_t PortableReaderImpl::ReadInt64Array(int64_t* res, const int32_t len)
+ {
+ return ReadRawArray<int64_t>(res, len, PortableUtils::ReadInt64Array, GG_TYPE_ARRAY_LONG);
+ }
+
+ int64_t PortableReaderImpl::ReadInt64(const char* fieldName)
+ {
+ return Read(fieldName, PortableUtils::ReadInt64, GG_TYPE_LONG, static_cast<int64_t>(0));
+ }
+
+ int32_t PortableReaderImpl::ReadInt64Array(const char* fieldName, int64_t* res, const int32_t len)
+ {
+ return ReadArray<int64_t>(fieldName, res, len,PortableUtils::ReadInt64Array, GG_TYPE_ARRAY_LONG);
+ }
+
+ float PortableReaderImpl::ReadFloat()
+ {
+ return ReadRaw<float>(PortableUtils::ReadFloat);
+ }
+
+ int32_t PortableReaderImpl::ReadFloatArray(float* res, const int32_t len)
+ {
+ return ReadRawArray<float>(res, len, PortableUtils::ReadFloatArray, GG_TYPE_ARRAY_FLOAT);
+ }
+
+ float PortableReaderImpl::ReadFloat(const char* fieldName)
+ {
+ return Read(fieldName, PortableUtils::ReadFloat, GG_TYPE_FLOAT, static_cast<float>(0));
+ }
+
+ int32_t PortableReaderImpl::ReadFloatArray(const char* fieldName, float* res, const int32_t len)
+ {
+ return ReadArray<float>(fieldName, res, len,PortableUtils::ReadFloatArray, GG_TYPE_ARRAY_FLOAT);
+ }
+
+ double PortableReaderImpl::ReadDouble()
+ {
+ return ReadRaw<double>(PortableUtils::ReadDouble);
+ }
+
+ int32_t PortableReaderImpl::ReadDoubleArray(double* res, const int32_t len)
+ {
+ return ReadRawArray<double>(res, len, PortableUtils::ReadDoubleArray, GG_TYPE_ARRAY_DOUBLE);
+ }
+
+ double PortableReaderImpl::ReadDouble(const char* fieldName)
+ {
+ return Read(fieldName, PortableUtils::ReadDouble, GG_TYPE_DOUBLE, static_cast<double>(0));
+ }
+
+ int32_t PortableReaderImpl::ReadDoubleArray(const char* fieldName, double* res, const int32_t len)
+ {
+ return ReadArray<double>(fieldName, res, len,PortableUtils::ReadDoubleArray, GG_TYPE_ARRAY_DOUBLE);
+ }
+
+ Guid PortableReaderImpl::ReadGuid()
+ {
+ CheckRawMode(true);
+ CheckSingleMode(true);
+
+ return ReadNullable(stream, PortableUtils::ReadGuid, GG_TYPE_UUID);
+ }
+
+ int32_t PortableReaderImpl::ReadGuidArray(Guid* res, const int32_t len)
+ {
+ CheckRawMode(true);
+ CheckSingleMode(true);
+
+ return ReadArrayInternal<Guid>(res, len, stream, ReadGuidArrayInternal, GG_TYPE_ARRAY_UUID);
+ }
+
+ Guid PortableReaderImpl::ReadGuid(const char* fieldName)
+ {
+ CheckRawMode(false);
+ CheckSingleMode(true);
+
+ int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
+ int32_t fieldLen = SeekField(fieldId);
+
+ if (fieldLen > 0)
+ return ReadNullable(stream, PortableUtils::ReadGuid, GG_TYPE_UUID);
+
+ return Guid();
+ }
+
+ int32_t PortableReaderImpl::ReadGuidArray(const char* fieldName, Guid* res, const int32_t len)
+ {
+ CheckRawMode(false);
+ CheckSingleMode(true);
+
+ int32_t pos = stream->Position();
+
+ int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
+ int32_t fieldLen = SeekField(fieldId);
+
+ if (fieldLen > 0) {
+ int32_t realLen = ReadArrayInternal<Guid>(res, len, stream, ReadGuidArrayInternal, GG_TYPE_ARRAY_UUID);
+
+ // If actual read didn't occur return to initial position so that we do not perform
+ // N jumps to find the field again, where N is total amount of fields.
+ if (realLen != -1 && (!res || realLen > len))
+ stream->Position(pos);
+
+ return realLen;
+ }
+
+ return -1;
+ }
+
+ void PortableReaderImpl::ReadGuidArrayInternal(InteropInputStream* stream, Guid* res, const int32_t len)
+ {
+ for (int i = 0; i < len; i++)
+ *(res + i) = ReadNullable<Guid>(stream, PortableUtils::ReadGuid, GG_TYPE_UUID);
+ }
+
+ int32_t PortableReaderImpl::ReadString(char* res, const int32_t len)
+ {
+ CheckRawMode(true);
+ CheckSingleMode(true);
+
+ return ReadStringInternal(res, len);
+ }
+
+ int32_t PortableReaderImpl::ReadString(const char* fieldName, char* res, const int32_t len)
+ {
+ CheckRawMode(false);
+ CheckSingleMode(true);
+
+ int32_t pos = stream->Position();
+
+ int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
+ int32_t fieldLen = SeekField(fieldId);
+
+ if (fieldLen > 0) {
+ int32_t realLen = ReadStringInternal(res, len);
+
+ // If actual read didn't occur return to initial position so that we do not perform
+ // N jumps to find the field again, where N is total amount of fields.
+ if (realLen != -1 && (!res || realLen > len))
+ stream->Position(pos);
+
+ return realLen;
+ }
+
+ return -1;
+ }
+
+ int32_t PortableReaderImpl::ReadStringArray(int32_t* size)
+ {
+ return StartContainerSession(true, GG_TYPE_ARRAY_STRING, size);
+ }
+
+ int32_t PortableReaderImpl::ReadStringArray(const char* fieldName, int32_t* size)
+ {
+ CheckRawMode(false);
+ CheckSingleMode(true);
+
+ int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
+ int32_t fieldLen = SeekField(fieldId);
+
+ if (fieldLen > 0)
+ return StartContainerSession(false, GG_TYPE_ARRAY_STRING, size);
+ else {
+ *size = -1;
+
+ return ++elemIdGen;
+ }
+ }
+
+ int32_t PortableReaderImpl::ReadStringElement(int32_t id, char* res, const int32_t len)
+ {
+ CheckSession(id);
+
+ int32_t posBefore = stream->Position();
+
+ int32_t realLen = ReadStringInternal(res, len);
+
+ int32_t posAfter = stream->Position();
+
+ if (posAfter > posBefore && ++elemRead == elemCnt) {
+ elemId = 0;
+ elemCnt = -1;
+ elemRead = 0;
+ }
+
+ return realLen;
+ }
+
+ int32_t PortableReaderImpl::ReadStringInternal(char* res, const int32_t len)
+ {
+ int8_t hdr = stream->ReadInt8();
+
+ if (hdr == GG_TYPE_STRING) {
+ bool utf8Mode = stream->ReadBool();
+ int32_t realLen = stream->ReadInt32();
+
+ if (res && len >= realLen) {
+ if (utf8Mode)
+ {
+ for (int i = 0; i < realLen; i++)
+ *(res + i) = static_cast<char>(stream->ReadInt8());
+ }
+ else
+ {
+ for (int i = 0; i < realLen; i++)
+ *(res + i) = static_cast<char>(stream->ReadUInt16());
+ }
+
+ if (len > realLen)
+ *(res + realLen) = 0; // Set NULL terminator if possible.
+ }
+ else
+ stream->Position(stream->Position() - 6);
+
+ return realLen;
+ }
+ else if (hdr != GG_HDR_NULL)
+ ThrowOnInvalidHeader(GG_TYPE_ARRAY, hdr);
+
+ return -1;
+ }
+
+ int32_t PortableReaderImpl::ReadArray(int32_t* size)
+ {
+ return StartContainerSession(true, GG_TYPE_ARRAY, size);
+ }
+
+ int32_t PortableReaderImpl::ReadArray(const char* fieldName, int32_t* size)
+ {
+ CheckRawMode(false);
+ CheckSingleMode(true);
+
+ int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
+ int32_t fieldLen = SeekField(fieldId);
+
+ if (fieldLen > 0)
+ return StartContainerSession(false, GG_TYPE_ARRAY, size);
+ else {
+ *size = -1;
+
+ return ++elemIdGen;
+ }
+ }
+
+ int32_t PortableReaderImpl::ReadCollection(CollectionType* typ, int32_t* size)
+ {
+ int32_t id = StartContainerSession(true, GG_TYPE_COLLECTION, size);
+
+ if (*size == -1)
+ *typ = GG_COLLECTION_UNDEFINED;
+ else
+ *typ = static_cast<CollectionType>(stream->ReadInt8());
+
+ return id;
+ }
+
+ int32_t PortableReaderImpl::ReadCollection(const char* fieldName, CollectionType* typ, int32_t* size)
+ {
+ CheckRawMode(false);
+ CheckSingleMode(true);
+
+ int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
+ int32_t fieldLen = SeekField(fieldId);
+
+ if (fieldLen > 0)
+ {
+ int32_t id = StartContainerSession(false, GG_TYPE_COLLECTION, size);
+
+ if (*size == -1)
+ *typ = GG_COLLECTION_UNDEFINED;
+ else
+ *typ = static_cast<CollectionType>(stream->ReadInt8());
+
+ return id;
+ }
+ else {
+ *typ = GG_COLLECTION_UNDEFINED;
+ *size = -1;
+
+ return ++elemIdGen;
+ }
+ }
+
+ int32_t PortableReaderImpl::ReadMap(MapType* typ, int32_t* size)
+ {
+ int32_t id = StartContainerSession(true, GG_TYPE_MAP, size);
+
+ if (*size == -1)
+ *typ = GG_MAP_UNDEFINED;
+ else
+ *typ = static_cast<MapType>(stream->ReadInt8());
+
+ return id;
+ }
+
+ int32_t PortableReaderImpl::ReadMap(const char* fieldName, MapType* typ, int32_t* size)
+ {
+ CheckRawMode(false);
+ CheckSingleMode(true);
+
+ int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
+ int32_t fieldLen = SeekField(fieldId);
+
+ if (fieldLen > 0)
+ {
+ int32_t id = StartContainerSession(false, GG_TYPE_MAP, size);
+
+ if (*size == -1)
+ *typ = GG_MAP_UNDEFINED;
+ else
+ *typ = static_cast<MapType>(stream->ReadInt8());
+
+ return id;
+ }
+ else {
+ *typ = GG_MAP_UNDEFINED;
+ *size = -1;
+
+ return ++elemIdGen;
+ }
+ }
+
+ bool PortableReaderImpl::HasNextElement(int32_t id)
+ {
+ return elemId == id && elemRead < elemCnt;
+ }
+
+ void PortableReaderImpl::SetRawMode()
+ {
+ CheckRawMode(false);
+ CheckSingleMode(true);
+
+ stream->Position(pos + rawOff);
+ rawMode = true;
+ }
+
+ template <>
+ int8_t PortableReaderImpl::ReadTopObject<int8_t>()
+ {
+ return ReadTopObject0(GG_TYPE_BYTE, PortableUtils::ReadInt8, static_cast<int8_t>(0));
+ }
+
+ template <>
+ bool PortableReaderImpl::ReadTopObject<bool>()
+ {
+ return ReadTopObject0(GG_TYPE_BOOL, PortableUtils::ReadBool, static_cast<bool>(0));
+ }
+
+ template <>
+ int16_t PortableReaderImpl::ReadTopObject<int16_t>()
+ {
+ return ReadTopObject0(GG_TYPE_SHORT, PortableUtils::ReadInt16, static_cast<int16_t>(0));
+ }
+
+ template <>
+ uint16_t PortableReaderImpl::ReadTopObject<uint16_t>()
+ {
+ return ReadTopObject0(GG_TYPE_CHAR, PortableUtils::ReadUInt16, static_cast<uint16_t>(0));
+ }
+
+ template <>
+ int32_t PortableReaderImpl::ReadTopObject<int32_t>()
+ {
+ return ReadTopObject0(GG_TYPE_INT, PortableUtils::ReadInt32, static_cast<int32_t>(0));
+ }
+
+ template <>
+ int64_t PortableReaderImpl::ReadTopObject<int64_t>()
+ {
+ return ReadTopObject0(GG_TYPE_LONG, PortableUtils::ReadInt64, static_cast<int64_t>(0));
+ }
+
+ template <>
+ float PortableReaderImpl::ReadTopObject<float>()
+ {
+ return ReadTopObject0(GG_TYPE_FLOAT, PortableUtils::ReadFloat, static_cast<float>(0));
+ }
+
+ template <>
+ double PortableReaderImpl::ReadTopObject<double>()
+ {
+ return ReadTopObject0(GG_TYPE_DOUBLE, PortableUtils::ReadDouble, static_cast<double>(0));
+ }
+
+ template <>
+ Guid PortableReaderImpl::ReadTopObject<Guid>()
+ {
+ int8_t typeId = stream->ReadInt8();
+
+ if (typeId == GG_TYPE_UUID)
+ return PortableUtils::ReadGuid(stream);
+ else if (typeId == GG_HDR_NULL)
+ return Guid();
+ else {
+ int32_t pos = stream->Position() - 1;
+
+ GG_ERROR_FORMATTED_3(GridError::GG_ERR_PORTABLE, "Invalid header", "position", pos, "expected", GG_TYPE_UUID, "actual", typeId)
+ }
+ }
+
+ InteropInputStream* PortableReaderImpl::GetStream()
+ {
+ return stream;
+ }
+
+ int32_t PortableReaderImpl::SeekField(const int32_t fieldId)
+ {
+ // We assume that it is very likely that fields are read in the same
+ // order as they were initially written. So we start seeking field
+ // from current stream position making a "loop" up to this position.
+ int32_t marker = stream->Position();
+
+ for (int32_t curPos = marker; curPos < pos + rawOff;)
+ {
+ int32_t curFieldId = stream->ReadInt32();
+ int32_t curFieldLen = stream->ReadInt32();
+
+ if (fieldId == curFieldId)
+ return curFieldLen;
+ else {
+ curPos = stream->Position() + curFieldLen;
+
+ stream->Position(curPos);
+ }
+ }
+
+ stream->Position(pos + 18);
+
+ for (int32_t curPos = stream->Position(); curPos < marker;)
+ {
+ int32_t curFieldId = stream->ReadInt32();
+ int32_t curFieldLen = stream->ReadInt32();
+
+ if (fieldId == curFieldId)
+ return curFieldLen;
+ else {
+ curPos = stream->Position() + curFieldLen;
+
+ stream->Position(curPos);
+ }
+ }
+
+ return -1;
+ }
+
+ void PortableReaderImpl::CheckRawMode(bool expected)
+ {
+ if (expected && !rawMode) {
+ GG_ERROR_1(GridError::GG_ERR_PORTABLE, "Operation can be performed only in raw mode.")
+ }
+ else if (!expected && rawMode) {
+ GG_ERROR_1(GridError::GG_ERR_PORTABLE, "Operation cannot be performed in raw mode.")
+ }
+ }
+
+ void PortableReaderImpl::CheckSingleMode(bool expected)
+ {
+ if (expected && elemId != 0) {
+ GG_ERROR_1(GridError::GG_ERR_PORTABLE, "Operation cannot be performed when container is being read.");
+ }
+ else if (!expected && elemId == 0) {
+ GG_ERROR_1(GridError::GG_ERR_PORTABLE, "Operation can be performed only when container is being read.");
+ }
+ }
+
+ int32_t PortableReaderImpl::StartContainerSession(bool expRawMode, int8_t expHdr, int32_t* size)
+ {
+ CheckRawMode(expRawMode);
+ CheckSingleMode(true);
+
+ int8_t hdr = stream->ReadInt8();
+
+ if (hdr == expHdr)
+ {
+ int32_t cnt = stream->ReadInt32();
+
+ if (cnt != 0)
+ {
+ elemId = ++elemIdGen;
+ elemCnt = cnt;
+ elemRead = 0;
+
+ *size = cnt;
+
+ return elemId;
+ }
+ else
+ {
+ *size = 0;
+
+ return ++elemIdGen;
+ }
+ }
+ else if (hdr == GG_HDR_NULL) {
+ *size = -1;
+
+ return ++elemIdGen;
+ }
+ else {
+ ThrowOnInvalidHeader(expHdr, hdr);
+
+ return 0;
+ }
+ }
+
+ void PortableReaderImpl::CheckSession(int32_t expSes)
+ {
+ if (elemId != expSes) {
+ GG_ERROR_1(GridError::GG_ERR_PORTABLE, "Containter read session has been finished or is not started yet.");
+ }
+ }
+
+ void PortableReaderImpl::ThrowOnInvalidHeader(int32_t pos, int8_t expHdr, int8_t hdr)
+ {
+ GG_ERROR_FORMATTED_3(GridError::GG_ERR_PORTABLE, "Invalid header", "position", pos, "expected", expHdr, "actual", hdr)
+ }
+
+ void PortableReaderImpl::ThrowOnInvalidHeader(int8_t expHdr, int8_t hdr)
+ {
+ int32_t pos = stream->Position() - 1;
+
+ ThrowOnInvalidHeader(pos, expHdr, hdr);
+ }
+ }
+ }
+}
\ No newline at end of file