You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/10/30 13:22:12 UTC
[2/6] ignite git commit: IGNITE-1770: Implemented constant-time field
lookup on protocol level.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp b/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp
index a8196a1..644088b 100644
--- a/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp
@@ -35,18 +35,19 @@ namespace ignite
namespace portable
{
PortableReaderImpl::PortableReaderImpl(InteropInputStream* stream, PortableIdResolver* idRslvr,
- int32_t pos, bool usrType, int32_t typeId, int32_t hashCode, int32_t len, int32_t rawOff) :
+ int32_t pos, bool usrType, int32_t typeId, int32_t hashCode, int32_t len, int32_t rawOff,
+ int32_t footerBegin, int32_t footerEnd) :
stream(stream), idRslvr(idRslvr), pos(pos), usrType(usrType), typeId(typeId),
- hashCode(hashCode), len(len), rawOff(rawOff), rawMode(false),
- elemIdGen(0), elemId(0), elemCnt(-1), elemRead(0)
+ hashCode(hashCode), len(len), rawOff(rawOff), rawMode(false), elemIdGen(0), elemId(0),
+ elemCnt(-1), elemRead(0), footerBegin(footerBegin), footerEnd(footerEnd)
{
// No-op.
}
PortableReaderImpl::PortableReaderImpl(InteropInputStream* stream) :
- stream(stream), idRslvr(NULL), pos(0), usrType(false), typeId(0), hashCode(0),
- len(0), rawOff(0), rawMode(true),
- elemIdGen(0), elemId(0), elemCnt(-1), elemRead(0)
+ stream(stream), idRslvr(NULL), pos(0), usrType(false), typeId(0), hashCode(0), len(0),
+ rawOff(0), rawMode(true), elemIdGen(0), elemId(0), elemCnt(-1), elemRead(0), footerBegin(-1),
+ footerEnd(-1)
{
// No-op.
}
@@ -233,12 +234,14 @@ namespace ignite
CheckSingleMode(true);
int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
- int32_t fieldLen = SeekField(fieldId);
+ int32_t fieldPos = FindField(fieldId);
- if (fieldLen > 0)
- return ReadNullable(stream, PortableUtils::ReadGuid, IGNITE_TYPE_UUID);
+ if (fieldPos <= 0)
+ return Guid();
+
+ stream->Position(fieldPos);
- return Guid();
+ return ReadNullable(stream, PortableUtils::ReadGuid, IGNITE_TYPE_UUID);
}
int32_t PortableReaderImpl::ReadGuidArray(const char* fieldName, Guid* res, const int32_t len)
@@ -249,20 +252,60 @@ namespace ignite
int32_t pos = stream->Position();
int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
- int32_t fieldLen = SeekField(fieldId);
+ int32_t fieldPos = FindField(fieldId);
- if (fieldLen > 0) {
- int32_t realLen = ReadArrayInternal<Guid>(res, len, stream, ReadGuidArrayInternal, IGNITE_TYPE_ARRAY_UUID);
+ if (fieldPos <= 0)
+ return -1;
- // If actual read didn't occur return to initial position so that we do not perform
- // N jumps to find the field again, where N is total amount of fields.
- if (realLen != -1 && (!res || realLen > len))
- stream->Position(pos);
+ stream->Position(fieldPos);
- return realLen;
+ int32_t realLen = ReadArrayInternal<Guid>(res, len, stream, ReadGuidArrayInternal, IGNITE_TYPE_ARRAY_UUID);
+
+ return realLen;
+ }
+
+ void PortableReaderImpl::ParseHeaderIfNeeded()
+ {
+ if (footerBegin)
+ return;
+
+ InteropStreamPositionGuard<InteropInputStream> posGuard(*stream);
+
+ int8_t hdr = stream->ReadInt8();
+
+ if (hdr != IGNITE_HDR_FULL)
+ IGNITE_ERROR_2(ignite::IgniteError::IGNITE_ERR_PORTABLE, "Invalid header: ", hdr);
+
+ int8_t protoVer = stream->ReadInt8();
+
+ if (protoVer != IGNITE_PROTO_VER) {
+ IGNITE_ERROR_2(ignite::IgniteError::IGNITE_ERR_PORTABLE,
+ "Unsupported portable protocol version: ", protoVer);
}
- return -1;
+ int16_t flags = stream->ReadInt16();
+ int32_t typeId = stream->ReadInt32();
+ int32_t hashCode = stream->ReadInt32();
+ int32_t len = stream->ReadInt32();
+ int32_t schemaId = stream->ReadInt32();
+ int32_t schemaOrRawOff = stream->ReadInt32();
+
+ if (flags & IGNITE_PORTABLE_FLAG_RAW_ONLY)
+ {
+ footerBegin = len;
+
+ rawOff = schemaOrRawOff;
+ }
+ else
+ {
+ footerBegin = schemaOrRawOff;
+
+ rawOff = (len - footerBegin) % 8 ? stream->ReadInt32(pos + len - 4) : schemaOrRawOff;
+ }
+
+ footerEnd = len - ((len - footerBegin) % 8);
+
+ bool usrType = flags & IGNITE_PORTABLE_FLAG_USER_OBJECT;
}
void PortableReaderImpl::ReadGuidArrayInternal(InteropInputStream* stream, Guid* res, const int32_t len)
@@ -287,20 +330,16 @@ namespace ignite
int32_t pos = stream->Position();
int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
- int32_t fieldLen = SeekField(fieldId);
+ int32_t fieldPos = FindField(fieldId);
- if (fieldLen > 0) {
- int32_t realLen = ReadStringInternal(res, len);
+ if (fieldPos <= 0)
+ return -1;
- // If actual read didn't occur return to initial position so that we do not perform
- // N jumps to find the field again, where N is total amount of fields.
- if (realLen != -1 && (!res || realLen > len))
- stream->Position(pos);
+ stream->Position(fieldPos);
- return realLen;
- }
+ int32_t realLen = ReadStringInternal(res, len);
- return -1;
+ return realLen;
}
int32_t PortableReaderImpl::ReadStringArray(int32_t* size)
@@ -314,15 +353,18 @@ namespace ignite
CheckSingleMode(true);
int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
- int32_t fieldLen = SeekField(fieldId);
+ int32_t fieldPos = FindField(fieldId);
- if (fieldLen > 0)
- return StartContainerSession(false, IGNITE_TYPE_ARRAY_STRING, size);
- else {
+ if (fieldPos <= 0)
+ {
*size = -1;
return ++elemIdGen;
}
+
+ stream->Position(fieldPos);
+
+ return StartContainerSession(false, IGNITE_TYPE_ARRAY_STRING, size);
}
int32_t PortableReaderImpl::ReadStringElement(int32_t id, char* res, const int32_t len)
@@ -389,15 +431,18 @@ namespace ignite
CheckSingleMode(true);
int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
- int32_t fieldLen = SeekField(fieldId);
+ int32_t fieldPos = FindField(fieldId);
- if (fieldLen > 0)
- return StartContainerSession(false, IGNITE_TYPE_ARRAY, size);
- else {
+ if (fieldPos <= 0)
+ {
*size = -1;
return ++elemIdGen;
}
+
+ stream->Position(fieldPos);
+
+ return StartContainerSession(false, IGNITE_TYPE_ARRAY, size);
}
int32_t PortableReaderImpl::ReadCollection(CollectionType* typ, int32_t* size)
@@ -418,25 +463,26 @@ namespace ignite
CheckSingleMode(true);
int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
- int32_t fieldLen = SeekField(fieldId);
+ int32_t fieldPos = FindField(fieldId);
- if (fieldLen > 0)
+ if (fieldPos <= 0)
{
- int32_t id = StartContainerSession(false, IGNITE_TYPE_COLLECTION, size);
-
- if (*size == -1)
- *typ = IGNITE_COLLECTION_UNDEFINED;
- else
- *typ = static_cast<CollectionType>(stream->ReadInt8());
-
- return id;
- }
- else {
*typ = IGNITE_COLLECTION_UNDEFINED;
*size = -1;
return ++elemIdGen;
}
+
+ stream->Position(fieldPos);
+
+ int32_t id = StartContainerSession(false, IGNITE_TYPE_COLLECTION, size);
+
+ if (*size == -1)
+ *typ = IGNITE_COLLECTION_UNDEFINED;
+ else
+ *typ = static_cast<CollectionType>(stream->ReadInt8());
+
+ return id;
}
int32_t PortableReaderImpl::ReadMap(MapType* typ, int32_t* size)
@@ -457,25 +503,26 @@ namespace ignite
CheckSingleMode(true);
int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
- int32_t fieldLen = SeekField(fieldId);
+ int32_t fieldPos = FindField(fieldId);
- if (fieldLen > 0)
+ if (fieldPos <= 0)
{
- int32_t id = StartContainerSession(false, IGNITE_TYPE_MAP, size);
-
- if (*size == -1)
- *typ = IGNITE_MAP_UNDEFINED;
- else
- *typ = static_cast<MapType>(stream->ReadInt8());
-
- return id;
- }
- else {
*typ = IGNITE_MAP_UNDEFINED;
*size = -1;
return ++elemIdGen;
}
+
+ stream->Position(fieldPos);
+
+ int32_t id = StartContainerSession(false, IGNITE_TYPE_MAP, size);
+
+ if (*size == -1)
+ *typ = IGNITE_MAP_UNDEFINED;
+ else
+ *typ = static_cast<MapType>(stream->ReadInt8());
+
+ return id;
}
CollectionType PortableReaderImpl::ReadCollectionTypeUnprotected()
@@ -504,11 +551,13 @@ namespace ignite
InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream);
int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
- int32_t fieldLen = SeekField(fieldId);
+ int32_t fieldPos = FindField(fieldId);
- if (fieldLen <= 0)
+ if (fieldPos <= 0)
return IGNITE_COLLECTION_UNDEFINED;
+ stream->Position(fieldPos);
+
return ReadCollectionTypeUnprotected();
}
@@ -544,11 +593,13 @@ namespace ignite
InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream);
int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
- int32_t fieldLen = SeekField(fieldId);
+ int32_t fieldPos = FindField(fieldId);
- if (fieldLen <= 0)
+ if (fieldPos <= 0)
return -1;
+ stream->Position(fieldPos);
+
return ReadCollectionSizeUnprotected();
}
@@ -635,41 +686,18 @@ namespace ignite
return stream;
}
- int32_t PortableReaderImpl::SeekField(const int32_t fieldId)
+ int32_t PortableReaderImpl::FindField(const int32_t fieldId)
{
- // We assume that it is very likely that fields are read in the same
- // order as they were initially written. So we start seeking field
- // from current stream position making a "loop" up to this position.
- int32_t marker = stream->Position();
-
- for (int32_t curPos = marker; curPos < pos + rawOff;)
- {
- int32_t curFieldId = stream->ReadInt32();
- int32_t curFieldLen = stream->ReadInt32();
+ InteropStreamPositionGuard<InteropInputStream> streamGuard(*stream);
- if (fieldId == curFieldId)
- return curFieldLen;
- else {
- curPos = stream->Position() + curFieldLen;
+ stream->Position(footerBegin);
- stream->Position(curPos);
- }
- }
-
- stream->Position(pos + IGNITE_FULL_HDR_LEN);
-
- for (int32_t curPos = stream->Position(); curPos < marker;)
+ for (int32_t schemaPos = footerBegin; schemaPos < footerEnd; schemaPos += 8)
{
- int32_t curFieldId = stream->ReadInt32();
- int32_t curFieldLen = stream->ReadInt32();
+ int32_t currentFieldId = stream->ReadInt32(schemaPos);
- if (fieldId == curFieldId)
- return curFieldLen;
- else {
- curPos = stream->Position() + curFieldLen;
-
- stream->Position(curPos);
- }
+ if (fieldId == currentFieldId)
+ return stream->ReadInt32(schemaPos + 4) + pos;
}
return -1;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/cpp/core/src/impl/portable/portable_schema.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/portable/portable_schema.cpp b/modules/platforms/cpp/core/src/impl/portable/portable_schema.cpp
new file mode 100644
index 0000000..57c23a7
--- /dev/null
+++ b/modules/platforms/cpp/core/src/impl/portable/portable_schema.cpp
@@ -0,0 +1,88 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "ignite/impl/portable/portable_schema.h"
+#include "ignite/impl/portable/portable_writer_impl.h"
+
+/** FNV1 hash offset basis. */
+enum { FNV1_OFFSET_BASIS = 0x811C9DC5 };
+
+/** FNV1 hash prime. */
+enum { FNV1_PRIME = 0x01000193 };
+
+namespace ignite
+{
+ namespace impl
+ {
+ namespace portable
+ {
+ PortableSchema::PortableSchema(): id(0), fieldsInfo(new FieldContainer())
+ {
+ // No-op.
+ }
+
+ PortableSchema::~PortableSchema()
+ {
+ delete fieldsInfo;
+ }
+
+ void PortableSchema::AddField(int32_t fieldId, int32_t offset)
+ {
+ if (!id)
+ {
+ // Initialize offset when the first field is written.
+ id = FNV1_OFFSET_BASIS;
+ }
+
+ // Advance schema hash.
+ int32_t idAccumulator = id ^ (fieldId & 0xFF);
+ idAccumulator *= FNV1_PRIME;
+ idAccumulator ^= (fieldId >> 8) & 0xFF;
+ idAccumulator *= FNV1_PRIME;
+ idAccumulator ^= (fieldId >> 16) & 0xFF;
+ idAccumulator *= FNV1_PRIME;
+ idAccumulator ^= (fieldId >> 24) & 0xFF;
+ idAccumulator *= FNV1_PRIME;
+
+ id = idAccumulator;
+
+ PortableSchemaFieldInfo info = { fieldId, offset };
+ fieldsInfo->push_back(info);
+ }
+
+ void PortableSchema::Write(interop::InteropOutputStream& out) const
+ {
+ for (FieldContainer::const_iterator i = fieldsInfo->begin(); i != fieldsInfo->end(); ++i)
+ {
+ out.WriteInt32(i->id);
+ out.WriteInt32(i->offset);
+ }
+ }
+
+ bool PortableSchema::Empty() const
+ {
+ return fieldsInfo->empty();
+ }
+
+ void PortableSchema::Clear()
+ {
+ id = 0;
+ fieldsInfo->clear();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/cpp/core/src/impl/portable/portable_writer_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/portable/portable_writer_impl.cpp b/modules/platforms/cpp/core/src/impl/portable/portable_writer_impl.cpp
index b16a934..4b65b4b 100644
--- a/modules/platforms/cpp/core/src/impl/portable/portable_writer_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/portable/portable_writer_impl.cpp
@@ -16,6 +16,7 @@
*/
#include "ignite/impl/portable/portable_writer_impl.h"
+#include "ignite/impl/interop/interop_stream_position_guard.h"
#include "ignite/ignite_error.h"
using namespace ignite::impl::interop;
@@ -29,16 +30,16 @@ namespace ignite
namespace portable
{
PortableWriterImpl::PortableWriterImpl(InteropOutputStream* stream, PortableIdResolver* idRslvr,
- PortableMetadataManager* metaMgr, PortableMetadataHandler* metaHnd) :
+ PortableMetadataManager* metaMgr, PortableMetadataHandler* metaHnd, int32_t start) :
stream(stream), idRslvr(idRslvr), metaMgr(metaMgr), metaHnd(metaHnd), typeId(idRslvr->GetTypeId()),
- elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(-1)
+ elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(-1), start(start)
{
// No-op.
}
PortableWriterImpl::PortableWriterImpl(InteropOutputStream* stream, PortableMetadataManager* metaMgr) :
stream(stream), idRslvr(NULL), metaMgr(metaMgr), metaHnd(NULL), typeId(0),
- elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(0)
+ elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(0), start(stream->Position())
{
// No-op.
}
@@ -240,7 +241,7 @@ namespace ignite
CheckRawMode(false);
CheckSingleMode(true);
- WriteFieldIdAndLength(fieldName, IGNITE_TYPE_UUID, 1 + 16);
+ WriteFieldId(fieldName, IGNITE_TYPE_UUID);
stream->WriteInt8(IGNITE_TYPE_UUID);
@@ -256,7 +257,6 @@ namespace ignite
if (val)
{
- stream->WriteInt32(5 + len * 17);
stream->WriteInt8(IGNITE_TYPE_ARRAY_UUID);
stream->WriteInt32(len);
@@ -269,7 +269,6 @@ namespace ignite
}
else
{
- stream->WriteInt32(1);
stream->WriteInt8(IGNITE_HDR_NULL);
}
}
@@ -298,21 +297,15 @@ namespace ignite
if (val)
{
- int32_t lenPos = stream->Position();
- stream->Position(lenPos + 4);
-
stream->WriteInt8(IGNITE_TYPE_STRING);
stream->WriteBool(false);
stream->WriteInt32(len);
for (int i = 0; i < len; i++)
stream->WriteUInt16(*(val + i));
-
- stream->WriteInt32(lenPos, stream->Position() - lenPos - 4);
}
else
{
- stream->WriteInt32(1);
stream->WriteInt8(IGNITE_HDR_NULL);
}
}
@@ -331,7 +324,7 @@ namespace ignite
{
StartContainerSession(false);
- WriteFieldIdSkipLength(fieldName, IGNITE_TYPE_ARRAY_STRING);
+ WriteFieldId(fieldName, IGNITE_TYPE_ARRAY_STRING);
stream->WriteInt8(IGNITE_TYPE_ARRAY_STRING);
stream->Position(stream->Position() + 4);
@@ -368,7 +361,7 @@ namespace ignite
CheckRawMode(false);
CheckSingleMode(true);
- WriteFieldIdAndLength(fieldName, IGNITE_TYPE_OBJECT, 1);
+ WriteFieldId(fieldName, IGNITE_TYPE_OBJECT);
stream->WriteInt8(IGNITE_HDR_NULL);
}
@@ -386,7 +379,7 @@ namespace ignite
{
StartContainerSession(false);
- WriteFieldIdSkipLength(fieldName, IGNITE_TYPE_ARRAY);
+ WriteFieldId(fieldName, IGNITE_TYPE_ARRAY);
stream->WriteInt8(IGNITE_TYPE_ARRAY);
stream->Position(stream->Position() + 4);
@@ -409,7 +402,7 @@ namespace ignite
{
StartContainerSession(false);
- WriteFieldIdSkipLength(fieldName, IGNITE_TYPE_COLLECTION);
+ WriteFieldId(fieldName, IGNITE_TYPE_COLLECTION);
stream->WriteInt8(IGNITE_TYPE_COLLECTION);
stream->Position(stream->Position() + 4);
@@ -433,7 +426,7 @@ namespace ignite
{
StartContainerSession(false);
- WriteFieldIdSkipLength(fieldName, IGNITE_TYPE_MAP);
+ WriteFieldId(fieldName, IGNITE_TYPE_MAP);
stream->WriteInt8(IGNITE_TYPE_MAP);
stream->Position(stream->Position() + 4);
@@ -446,15 +439,7 @@ namespace ignite
{
CheckSession(id);
- if (rawPos == -1)
- {
- int32_t len = stream->Position() - elemPos - 4;
-
- stream->WriteInt32(elemPos + 4, len);
- stream->WriteInt32(elemPos + 9, elemCnt);
- }
- else
- stream->WriteInt32(elemPos + 1, elemCnt);
+ stream->WriteInt32(elemPos + 1, elemCnt);
elemId = 0;
elemCnt = 0;
@@ -516,27 +501,14 @@ namespace ignite
void PortableWriterImpl::WriteFieldId(const char* fieldName, int32_t fieldTypeId)
{
int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName);
-
- stream->WriteInt32(fieldId);
+ int32_t fieldOff = stream->Position() - start;
+
+ schema.AddField(fieldId, fieldOff);
if (metaHnd)
metaHnd->OnFieldWritten(fieldId, fieldName, fieldTypeId);
}
- void PortableWriterImpl::WriteFieldIdSkipLength(const char* fieldName, int32_t fieldTypeId)
- {
- WriteFieldId(fieldName, fieldTypeId);
-
- stream->Position(stream->Position() + 4);
- }
-
- void PortableWriterImpl::WriteFieldIdAndLength(const char* fieldName, int32_t fieldTypeId, int32_t len)
- {
- WriteFieldId(fieldName, fieldTypeId);
-
- stream->WriteInt32(len);
- }
-
template <>
void PortableWriterImpl::WriteTopObject<int8_t>(const int8_t& obj)
{
@@ -591,6 +563,50 @@ namespace ignite
WriteTopObject0<Guid>(obj, PortableUtils::WriteGuid, IGNITE_TYPE_UUID);
}
+ void PortableWriterImpl::PostWrite()
+ {
+ int32_t lenWithoutSchema = stream->Position() - start;
+
+ if (schema.Empty())
+ {
+ InteropStreamPositionGuard<InteropOutputStream> guard(*stream);
+
+ stream->Position(start + IGNITE_OFFSET_FLAGS);
+ stream->WriteInt16(IGNITE_PORTABLE_FLAG_USER_OBJECT | IGNITE_PORTABLE_FLAG_RAW_ONLY);
+
+ stream->WriteInt32(start + IGNITE_OFFSET_LEN, lenWithoutSchema);
+ stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_ID, 0);
+ stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_OR_RAW_OFF, GetRawPosition() - start);
+ }
+ else
+ {
+ int32_t schemaId = schema.GetId();
+
+ WriteAndClearSchema();
+
+ if (rawPos > 0)
+ stream->WriteInt32(rawPos - start);
+
+ int32_t length = stream->Position() - start;
+
+ stream->WriteInt32(start + IGNITE_OFFSET_LEN, length);
+ stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_ID, schemaId);
+ stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_OR_RAW_OFF, lenWithoutSchema);
+ }
+ }
+
+ bool PortableWriterImpl::HasSchema() const
+ {
+ return !schema.Empty();
+ }
+
+ void PortableWriterImpl::WriteAndClearSchema()
+ {
+ schema.Write(*stream);
+
+ schema.Clear();
+ }
+
InteropOutputStream* PortableWriterImpl::GetStream()
{
return stream;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Portable/PortableWriteBenchmark.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Portable/PortableWriteBenchmark.cs b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Portable/PortableWriteBenchmark.cs
index 7815106..c8fd30b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Portable/PortableWriteBenchmark.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Portable/PortableWriteBenchmark.cs
@@ -78,7 +78,7 @@ namespace Apache.Ignite.Benchmarks.Portable
TypeConfigurations = new List<PortableTypeConfiguration>
{
new PortableTypeConfiguration(typeof (Address)) {MetadataEnabled = true},
- new PortableTypeConfiguration(typeof (TestModel)) {MetadataEnabled = false}
+ //new PortableTypeConfiguration(typeof (TestModel)) {MetadataEnabled = false}
}
});
}
@@ -90,7 +90,7 @@ namespace Apache.Ignite.Benchmarks.Portable
protected override void GetDescriptors(ICollection<BenchmarkOperationDescriptor> descs)
{
descs.Add(BenchmarkOperationDescriptor.Create("WriteAddress", WriteAddress, 1));
- descs.Add(BenchmarkOperationDescriptor.Create("WriteTestModel", WriteTestModel, 1));
+ //descs.Add(BenchmarkOperationDescriptor.Create("WriteTestModel", WriteTestModel, 1));
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index e4450b6..ffe5d9f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -171,6 +171,7 @@
<Compile Include="Impl\Common\CopyOnWriteConcurrentDictionary.cs" />
<Compile Include="Impl\Common\DelegateConverter.cs" />
<Compile Include="Impl\Common\DelegateTypeDescriptor.cs" />
+ <Compile Include="Impl\Common\Fnv1Hash.cs" />
<Compile Include="Impl\Common\Future.cs" />
<Compile Include="Impl\Common\FutureConverter.cs" />
<Compile Include="Impl\Common\FutureType.cs" />
@@ -179,6 +180,7 @@
<Compile Include="Impl\Common\IFutureInternal.cs" />
<Compile Include="Impl\Common\IgniteHome.cs" />
<Compile Include="Impl\Common\LoadedAssembliesResolver.cs" />
+ <Compile Include="Impl\Common\ResizeableArray.cs" />
<Compile Include="Impl\Common\TypeCaster.cs" />
<Compile Include="Impl\Compute\Closure\ComputeAbstractClosureTask.cs" />
<Compile Include="Impl\Compute\Closure\ComputeActionJob.cs" />
@@ -253,6 +255,9 @@
<Compile Include="Impl\Portable\PortableMarshaller.cs" />
<Compile Include="Impl\Portable\PortableMode.cs" />
<Compile Include="Impl\Portable\PortableObjectHandle.cs" />
+ <Compile Include="Impl\Portable\PortableObjectHeader.cs" />
+ <Compile Include="Impl\Portable\PortableObjectSchema.cs" />
+ <Compile Include="Impl\Portable\PortableObjectSchemaField.cs" />
<Compile Include="Impl\Portable\PortableReaderExtensions.cs" />
<Compile Include="Impl\Portable\PortableReaderHandleDictionary.cs" />
<Compile Include="Impl\Portable\PortableReaderImpl.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Fnv1Hash.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Fnv1Hash.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Fnv1Hash.cs
new file mode 100644
index 0000000..26bbe7c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Fnv1Hash.cs
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ /// <summary>
+ /// Fowler–Noll–Vo hash function.
+ /// </summary>
+ internal static class Fnv1Hash
+ {
+ /** Basis. */
+ public const int Basis = unchecked((int) 0x811C9DC5);
+
+ /** Prime. */
+ public const int Prime = 0x01000193;
+
+ /// <summary>
+ /// Updates the hashcode with next int.
+ /// </summary>
+ /// <param name="current">The current.</param>
+ /// <param name="next">The next.</param>
+ /// <returns>Updated hashcode.</returns>
+ public static int Update(int current, int next)
+ {
+ current = current ^ (next & 0xFF);
+ current = current * Prime;
+
+ current = current ^ ((next >> 8) & 0xFF);
+ current = current * Prime;
+
+ current = current ^ ((next >> 16) & 0xFF);
+ current = current * Prime;
+
+ current = current ^ ((next >> 24) & 0xFF);
+ current = current * Prime;
+
+ return current;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ResizeableArray.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ResizeableArray.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ResizeableArray.cs
new file mode 100644
index 0000000..cb432e5
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ResizeableArray.cs
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using System.Collections.Generic;
+
+ /// <summary>
+ /// Simple append-only <see cref="List{T}"/> alternative which exposes internal array.
+ /// </summary>
+ internal class ResizeableArray<T>
+ {
+ /** Array. */
+ private T[] _arr;
+
+ /** Items count. */
+ private int _count;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="capacity">Capacity.</param>
+ public ResizeableArray(int capacity)
+ {
+ _arr = new T[capacity];
+ }
+
+ /// <summary>
+ /// Array.
+ /// </summary>
+ public T[] Array
+ {
+ get { return _arr; }
+ }
+
+ /// <summary>
+ /// Count.
+ /// </summary>
+ public int Count
+ {
+ get { return _count; }
+ }
+
+ /// <summary>
+ /// Add element.
+ /// </summary>
+ /// <param name="element">Element.</param>
+ public void Add(T element)
+ {
+ if (_count == _arr.Length)
+ System.Array.Resize(ref _arr, _arr.Length*2);
+
+ _arr[_count++] = element;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs
index b7ea4d6..44766c2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Core.Impl.Memory
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Text;
+ using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Portable.IO;
/// <summary>
@@ -92,6 +93,8 @@ namespace Apache.Ignite.Core.Impl.Memory
/** <inheritdoc /> */
public void WriteByteArray(byte[] val)
{
+ IgniteArgumentCheck.NotNull(val, "val");
+
fixed (byte* val0 = val)
{
CopyFromAndShift(val0, val.Length);
@@ -107,6 +110,8 @@ namespace Apache.Ignite.Core.Impl.Memory
/** <inheritdoc /> */
public void WriteBoolArray(bool[] val)
{
+ IgniteArgumentCheck.NotNull(val, "val");
+
fixed (bool* val0 = val)
{
CopyFromAndShift((byte*)val0, val.Length);
@@ -124,6 +129,8 @@ namespace Apache.Ignite.Core.Impl.Memory
/** <inheritdoc /> */
public virtual void WriteShortArray(short[] val)
{
+ IgniteArgumentCheck.NotNull(val, "val");
+
fixed (short* val0 = val)
{
CopyFromAndShift((byte*)val0, val.Length << Shift2);
@@ -141,6 +148,8 @@ namespace Apache.Ignite.Core.Impl.Memory
/** <inheritdoc /> */
public virtual void WriteCharArray(char[] val)
{
+ IgniteArgumentCheck.NotNull(val, "val");
+
fixed (char* val0 = val)
{
CopyFromAndShift((byte*)val0, val.Length << Shift2);
@@ -167,6 +176,8 @@ namespace Apache.Ignite.Core.Impl.Memory
/** <inheritdoc /> */
public virtual void WriteIntArray(int[] val)
{
+ IgniteArgumentCheck.NotNull(val, "val");
+
fixed (int* val0 = val)
{
CopyFromAndShift((byte*)val0, val.Length << Shift4);
@@ -184,6 +195,8 @@ namespace Apache.Ignite.Core.Impl.Memory
/** <inheritdoc /> */
public virtual void WriteLongArray(long[] val)
{
+ IgniteArgumentCheck.NotNull(val, "val");
+
fixed (long* val0 = val)
{
CopyFromAndShift((byte*)val0, val.Length << Shift8);
@@ -201,6 +214,8 @@ namespace Apache.Ignite.Core.Impl.Memory
/** <inheritdoc /> */
public virtual void WriteFloatArray(float[] val)
{
+ IgniteArgumentCheck.NotNull(val, "val");
+
fixed (float* val0 = val)
{
CopyFromAndShift((byte*)val0, val.Length << Shift4);
@@ -218,6 +233,8 @@ namespace Apache.Ignite.Core.Impl.Memory
/** <inheritdoc /> */
public virtual void WriteDoubleArray(double[] val)
{
+ IgniteArgumentCheck.NotNull(val, "val");
+
fixed (double* val0 = val)
{
CopyFromAndShift((byte*)val0, val.Length << Shift8);
@@ -227,6 +244,9 @@ namespace Apache.Ignite.Core.Impl.Memory
/** <inheritdoc /> */
public int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding)
{
+ IgniteArgumentCheck.NotNull(charCnt, "charCnt");
+ IgniteArgumentCheck.NotNull(byteCnt, "byteCnt");
+
int curPos = EnsureWriteCapacityAndShift(byteCnt);
return encoding.GetBytes(chars, charCnt, _data + curPos, byteCnt);
@@ -235,6 +255,8 @@ namespace Apache.Ignite.Core.Impl.Memory
/** <inheritdoc /> */
public void Write(byte[] src, int off, int cnt)
{
+ IgniteArgumentCheck.NotNull(src, "src");
+
fixed (byte* src0 = src)
{
CopyFromAndShift(src0 + off, cnt);
@@ -260,7 +282,6 @@ namespace Apache.Ignite.Core.Impl.Memory
}
/** <inheritdoc /> */
-
public byte[] ReadByteArray(int cnt)
{
int curPos = EnsureReadCapacityAndShift(cnt);
@@ -423,6 +444,8 @@ namespace Apache.Ignite.Core.Impl.Memory
/** <inheritdoc /> */
public void Read(byte[] dest, int off, int cnt)
{
+ IgniteArgumentCheck.NotNull(dest, "dest");
+
fixed (byte* dest0 = dest)
{
Read(dest0 + off, cnt);
@@ -633,9 +656,9 @@ namespace Apache.Ignite.Core.Impl.Memory
}
/** <inheritdoc /> */
- public int Remaining()
+ public int Remaining
{
- return _len - _pos;
+ get { return _len - _pos; }
}
/** <inheritdoc /> */
@@ -675,13 +698,13 @@ namespace Apache.Ignite.Core.Impl.Memory
#region ARRAYS
/** <inheritdoc /> */
- public byte[] Array()
+ public byte[] GetArray()
{
- return ArrayCopy();
+ return GetArrayCopy();
}
/** <inheritdoc /> */
- public byte[] ArrayCopy()
+ public byte[] GetArrayCopy()
{
byte[] res = new byte[_mem.Length];
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableTypeDescriptor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableTypeDescriptor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableTypeDescriptor.cs
index 3872773..88a7e22 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableTypeDescriptor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/IPortableTypeDescriptor.cs
@@ -133,5 +133,10 @@ namespace Apache.Ignite.Core.Impl.Portable
/// <param name="pathIdx">Path index.</param>
/// <param name="updates">Recorded updates.</param>
void UpdateReadStructure(PortableStructure exp, int pathIdx, IList<PortableStructureUpdate> updates);
+
+ /// <summary>
+ /// Gets the schema.
+ /// </summary>
+ PortableObjectSchema Schema { get; }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/IPortableStream.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/IPortableStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/IPortableStream.cs
index 73d5a51..80087e4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/IPortableStream.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/IPortableStream.cs
@@ -289,20 +289,20 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
/// <summary>
/// Gets remaining bytes in the stream.
/// </summary>
- /// <returns>Remaining bytes.</returns>
- int Remaining();
+ /// <value>Remaining bytes.</value>
+ int Remaining { get; }
/// <summary>
/// Gets underlying array, avoiding copying if possible.
/// </summary>
/// <returns>Underlying array.</returns>
- byte[] Array();
+ byte[] GetArray();
/// <summary>
/// Gets underlying data in a new array.
/// </summary>
/// <returns>New array with data.</returns>
- byte[] ArrayCopy();
+ byte[] GetArrayCopy();
/// <summary>
/// Check whether array passed as argument is the same as the stream hosts.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs
index f84b5a3..0cd3342 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs
@@ -18,28 +18,16 @@
namespace Apache.Ignite.Core.Impl.Portable.IO
{
using System;
- using System.Diagnostics.CodeAnalysis;
using System.IO;
- using System.Reflection;
using System.Text;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Memory;
/// <summary>
/// Base class for managed and unmanaged data streams.
/// </summary>
internal unsafe abstract class PortableAbstractStream : IPortableStream
{
- /// <summary>
- /// Array copy delegate.
- /// </summary>
- delegate void MemCopy(byte* a1, byte* a2, int len);
-
- /** memcpy function handle. */
- private static readonly MemCopy Memcpy;
-
- /** Whether src and dest arguments are inverted. */
- [SuppressMessage("Microsoft.Performance", "CA1802:UseLiteralsWhereAppropriate")]
- private static readonly bool MemcpyInverted;
-
/** Byte: zero. */
private const byte ByteZero = 0;
@@ -56,37 +44,6 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
private bool _disposed;
/// <summary>
- /// Static initializer.
- /// </summary>
- [SuppressMessage("Microsoft.Design", "CA1065:DoNotRaiseExceptionsInUnexpectedLocations")]
- [SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline")]
- static PortableAbstractStream()
- {
- Type type = typeof(Buffer);
-
- const BindingFlags flags = BindingFlags.Static | BindingFlags.NonPublic;
- Type[] paramTypes = { typeof(byte*), typeof(byte*), typeof(int) };
-
- // Assume .Net 4.5.
- MethodInfo mthd = type.GetMethod("Memcpy", flags, null, paramTypes, null);
-
- MemcpyInverted = true;
-
- if (mthd == null)
- {
- // Assume .Net 4.0.
- mthd = type.GetMethod("memcpyimpl", flags, null, paramTypes, null);
-
- MemcpyInverted = false;
-
- if (mthd == null)
- throw new InvalidOperationException("Unable to get memory copy function delegate.");
- }
-
- Memcpy = (MemCopy)Delegate.CreateDelegate(typeof(MemCopy), mthd);
- }
-
- /// <summary>
/// Write byte.
/// </summary>
/// <param name="val">Byte value.</param>
@@ -1076,15 +1033,15 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
/// <summary>
/// Internal read routine.
/// </summary>
+ /// <param name="src">Source</param>
/// <param name="dest">Destination.</param>
/// <param name="cnt">Count.</param>
- /// <param name="data">Data (source).</param>
/// <returns>Amount of bytes written.</returns>
- protected void ReadInternal(byte* dest, int cnt, byte* data)
+ protected void ReadInternal(byte* src, byte* dest, int cnt)
{
- int cnt0 = Math.Min(Remaining(), cnt);
+ int cnt0 = Math.Min(Remaining, cnt);
- CopyMemory(data + Pos, dest, cnt0);
+ CopyMemory(src + Pos, dest, cnt0);
ShiftRead(cnt0);
}
@@ -1100,10 +1057,10 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
/// <summary>
/// Gets remaining bytes in the stream.
/// </summary>
- /// <returns>
- /// Remaining bytes.
- /// </returns>
- public abstract int Remaining();
+ /// <value>
+ /// Remaining bytes.
+ /// </value>
+ public abstract int Remaining { get; }
/// <summary>
/// Gets underlying array, avoiding copying if possible.
@@ -1111,7 +1068,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
/// <returns>
/// Underlying array.
/// </returns>
- public abstract byte[] Array();
+ public abstract byte[] GetArray();
/// <summary>
/// Gets underlying data in a new array.
@@ -1119,7 +1076,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
/// <returns>
/// New array with data.
/// </returns>
- public abstract byte[] ArrayCopy();
+ public abstract byte[] GetArrayCopy();
/// <summary>
/// Check whether array passed as argument is the same as the stream hosts.
@@ -1291,10 +1248,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
/// <param name="len">Length.</param>
private static void CopyMemory(byte* src, byte* dest, int len)
{
- if (MemcpyInverted)
- Memcpy.Invoke(dest, src, len);
- else
- Memcpy.Invoke(src, dest, len);
+ PlatformMemoryUtils.CopyMemory(src, dest, len);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs
index 690f92c..b7d001e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs
@@ -18,8 +18,11 @@
namespace Apache.Ignite.Core.Impl.Portable.IO
{
using System;
+ using System.Diagnostics;
using System.IO;
using System.Text;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Memory;
/// <summary>
/// Portable onheap stream.
@@ -27,7 +30,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
internal unsafe class PortableHeapStream : PortableAbstractStream
{
/** Data array. */
- protected byte[] Data;
+ private byte[] _data;
/// <summary>
/// Constructor.
@@ -35,7 +38,9 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
/// <param name="cap">Initial capacity.</param>
public PortableHeapStream(int cap)
{
- Data = new byte[cap];
+ Debug.Assert(cap >= 0);
+
+ _data = new byte[cap];
}
/// <summary>
@@ -44,7 +49,9 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
/// <param name="data">Data array.</param>
public PortableHeapStream(byte[] data)
{
- Data = data;
+ Debug.Assert(data != null);
+
+ _data = data;
}
/** <inheritdoc /> */
@@ -52,7 +59,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureWriteCapacityAndShift(1);
- Data[pos0] = val;
+ _data[pos0] = val;
}
/** <inheritdoc /> */
@@ -60,7 +67,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureReadCapacityAndShift(1);
- return Data[pos0];
+ return _data[pos0];
}
/** <inheritdoc /> */
@@ -68,7 +75,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureWriteCapacityAndShift(val.Length);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteByteArray0(val, data0 + pos0);
}
@@ -79,7 +86,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureReadCapacityAndShift(cnt);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
return ReadByteArray0(cnt, data0 + pos0);
}
@@ -90,7 +97,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureWriteCapacityAndShift(val.Length);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteBoolArray0(val, data0 + pos0);
}
@@ -101,7 +108,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureReadCapacityAndShift(cnt);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
return ReadBoolArray0(cnt, data0 + pos0);
}
@@ -112,7 +119,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureWriteCapacityAndShift(2);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteShort0(val, data0 + pos0);
}
@@ -123,7 +130,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureReadCapacityAndShift(2);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
return ReadShort0(data0 + pos0);
}
@@ -136,7 +143,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureWriteCapacityAndShift(cnt);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteShortArray0(val, data0 + pos0, cnt);
}
@@ -149,7 +156,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureReadCapacityAndShift(cnt0);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
return ReadShortArray0(cnt, data0 + pos0, cnt0);
}
@@ -162,7 +169,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureWriteCapacityAndShift(cnt);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteCharArray0(val, data0 + pos0, cnt);
}
@@ -175,7 +182,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureReadCapacityAndShift(cnt0);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
return ReadCharArray0(cnt, data0 + pos0, cnt0);
}
@@ -186,7 +193,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureWriteCapacityAndShift(4);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteInt0(val, data0 + pos0);
}
@@ -197,7 +204,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
EnsureWriteCapacity(writePos + 4);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteInt0(val, data0 + writePos);
}
@@ -208,7 +215,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureReadCapacityAndShift(4);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
return ReadInt0(data0 + pos0);
}
@@ -221,7 +228,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureWriteCapacityAndShift(cnt);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteIntArray0(val, data0 + pos0, cnt);
}
@@ -234,7 +241,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureReadCapacityAndShift(cnt0);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
return ReadIntArray0(cnt, data0 + pos0, cnt0);
}
@@ -247,7 +254,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureWriteCapacityAndShift(cnt);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteFloatArray0(val, data0 + pos0, cnt);
}
@@ -260,7 +267,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureReadCapacityAndShift(cnt0);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
return ReadFloatArray0(cnt, data0 + pos0, cnt0);
}
@@ -271,7 +278,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureWriteCapacityAndShift(8);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteLong0(val, data0 + pos0);
}
@@ -282,7 +289,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
int pos0 = EnsureReadCapacityAndShift(8);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
return ReadLong0(data0 + pos0);
}
@@ -295,7 +302,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureWriteCapacityAndShift(cnt);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteLongArray0(val, data0 + pos0, cnt);
}
@@ -308,7 +315,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureReadCapacityAndShift(cnt0);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
return ReadLongArray0(cnt, data0 + pos0, cnt0);
}
@@ -321,7 +328,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureWriteCapacityAndShift(cnt);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteDoubleArray0(val, data0 + pos0, cnt);
}
@@ -334,7 +341,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int pos0 = EnsureReadCapacityAndShift(cnt0);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
return ReadDoubleArray0(cnt, data0 + pos0, cnt0);
}
@@ -347,7 +354,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
int written;
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
written = WriteString0(chars, charCnt, byteCnt, encoding, data0 + pos0);
}
@@ -360,7 +367,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
{
EnsureWriteCapacity(Pos + cnt);
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
WriteInternal(src, cnt, data0);
}
@@ -371,30 +378,30 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
/** <inheritdoc /> */
public override void Read(byte* dest, int cnt)
{
- fixed (byte* data0 = Data)
+ fixed (byte* data0 = _data)
{
- ReadInternal(dest, cnt, data0);
+ ReadInternal(data0, dest, cnt);
}
}
/** <inheritdoc /> */
- public override int Remaining()
+ public override int Remaining
{
- return Data.Length - Pos;
+ get { return _data.Length - Pos; }
}
/** <inheritdoc /> */
- public override byte[] Array()
+ public override byte[] GetArray()
{
- return Data;
+ return _data;
}
/** <inheritdoc /> */
- public override byte[] ArrayCopy()
+ public override byte[] GetArrayCopy()
{
byte[] copy = new byte[Pos];
- Buffer.BlockCopy(Data, 0, copy, 0, Pos);
+ Buffer.BlockCopy(_data, 0, copy, 0, Pos);
return copy;
}
@@ -402,7 +409,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
/** <inheritdoc /> */
public override bool IsSameArray(byte[] arr)
{
- return Data == arr;
+ return _data == arr;
}
/** <inheritdoc /> */
@@ -416,32 +423,32 @@ namespace Apache.Ignite.Core.Impl.Portable.IO
/// </summary>
internal byte[] InternalArray
{
- get { return Data; }
+ get { return _data; }
}
/** <inheritdoc /> */
protected override void EnsureWriteCapacity(int cnt)
{
- if (cnt > Data.Length)
+ if (cnt > _data.Length)
{
- int newCap = Capacity(Data.Length, cnt);
+ int newCap = Capacity(_data.Length, cnt);
byte[] data0 = new byte[newCap];
// Copy the whole initial array length here because it can be changed
// from Java without position adjusting.
- Buffer.BlockCopy(Data, 0, data0, 0, Data.Length);
+ Buffer.BlockCopy(_data, 0, data0, 0, _data.Length);
- Data = data0;
+ _data = data0;
}
}
/** <inheritdoc /> */
protected override void EnsureReadCapacity(int cnt)
{
- if (Data.Length - Pos < cnt)
+ if (_data.Length - Pos < cnt)
throw new EndOfStreamException("Not enough data in stream [expected=" + cnt +
- ", remaining=" + (Data.Length - Pos) + ']');
+ ", remaining=" + (_data.Length - Pos) + ']');
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs
index 9767037..08a1d00 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs
@@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl.Portable
using System.Diagnostics;
using System.IO;
using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Portable.IO;
using Apache.Ignite.Core.Impl.Portable.Metadata;
using Apache.Ignite.Core.Portable;
@@ -357,9 +358,9 @@ namespace Apache.Ignite.Core.Impl.Portable
inStream.Seek(_obj.Offset, SeekOrigin.Begin);
// Assume that resulting length will be no less than header + [fields_cnt] * 12;
- int len = PortableUtils.FullHdrLen + (_vals == null ? 0 : _vals.Count * 12);
+ int estimatedCapacity = PortableObjectHeader.Size + (_vals == null ? 0 : _vals.Count*12);
- PortableHeapStream outStream = new PortableHeapStream(len);
+ PortableHeapStream outStream = new PortableHeapStream(estimatedCapacity);
PortableWriterImpl writer = _portables.Marshaller.StartMarshal(outStream);
@@ -377,8 +378,8 @@ namespace Apache.Ignite.Core.Impl.Portable
_portables.Marshaller.FinishMarshal(writer);
// Create portable object once metadata is processed.
- return new PortableUserObject(_portables.Marshaller, outStream.InternalArray, 0,
- _desc.TypeId, _hashCode);
+ return new PortableUserObject(_portables.Marshaller, outStream.InternalArray, 0,
+ PortableObjectHeader.Read(outStream, 0));
}
finally
{
@@ -568,7 +569,7 @@ namespace Apache.Ignite.Core.Impl.Portable
/// <param name="hash">New hash.</param>
/// <param name="vals">Values to be replaced.</param>
/// <returns>Mutated object.</returns>
- private void Mutate0(Context ctx, PortableHeapStream inStream, IPortableStream outStream,
+ private unsafe void Mutate0(Context ctx, PortableHeapStream inStream, IPortableStream outStream,
bool changeHash, int hash, IDictionary<int, PortableBuilderField> vals)
{
int inStartPos = inStream.Position;
@@ -605,13 +606,9 @@ namespace Apache.Ignite.Core.Impl.Portable
}
else if (inHdr == PortableUtils.HdrFull)
{
- PortableUtils.ValidateProtocolVersion(inStream);
-
- byte inUsrFlag = inStream.ReadByte();
- int inTypeId = inStream.ReadInt();
- int inHash = inStream.ReadInt();
- int inLen = inStream.ReadInt();
- int inRawOff = inStream.ReadInt();
+ var inHeader = PortableObjectHeader.Read(inStream, inStartPos);
+
+ PortableUtils.ValidateProtocolVersion(inHeader.Version);
int hndPos;
@@ -620,104 +617,106 @@ namespace Apache.Ignite.Core.Impl.Portable
// Object could be cached in parent builder.
PortableBuilderField cachedVal;
- if (_parent._cache != null && _parent._cache.TryGetValue(inStartPos, out cachedVal)) {
+ if (_parent._cache != null && _parent._cache.TryGetValue(inStartPos, out cachedVal))
+ {
WriteField(ctx, cachedVal);
}
else
{
// New object, write in full form.
- outStream.WriteByte(PortableUtils.HdrFull);
- outStream.WriteByte(PortableUtils.ProtoVer);
- outStream.WriteByte(inUsrFlag);
- outStream.WriteInt(inTypeId);
- outStream.WriteInt(changeHash ? hash : inHash);
+ var inSchema = inHeader.ReadSchema(inStream, inStartPos);
- // Skip length and raw offset as they are not known at this point.
- outStream.Seek(8, SeekOrigin.Current);
+ var outSchemaLen = vals.Count + (inSchema == null ? 0 : inSchema.Length);
+ var outSchema = outSchemaLen > 0
+ ? new ResizeableArray<PortableObjectSchemaField>(outSchemaLen)
+ : null;
- // Write regular fields.
- while (inStream.Position < inStartPos + inRawOff)
+ // Skip header as it is not known at this point.
+ outStream.Seek(PortableObjectHeader.Size, SeekOrigin.Current);
+
+ if (inSchema != null)
{
- int inFieldId = inStream.ReadInt();
- int inFieldLen = inStream.ReadInt();
- int inFieldDataPos = inStream.Position;
+ foreach (var inField in inSchema)
+ {
+ PortableBuilderField fieldVal;
- PortableBuilderField fieldVal;
+ var fieldFound = vals.TryGetValue(inField.Id, out fieldVal);
- bool fieldFound = vals.TryGetValue(inFieldId, out fieldVal);
+ if (fieldFound && fieldVal == PortableBuilderField.RmvMarker)
+ continue;
- if (!fieldFound || fieldVal != PortableBuilderField.RmvMarker)
- {
- outStream.WriteInt(inFieldId);
+ // ReSharper disable once PossibleNullReferenceException (can't be null)
+ outSchema.Add(new PortableObjectSchemaField(inField.Id, outStream.Position - outStartPos));
- int fieldLenPos = outStream.Position; // Here we will write length later.
-
- outStream.Seek(4, SeekOrigin.Current);
+ if (!fieldFound)
+ fieldFound = _parent._cache != null &&
+ _parent._cache.TryGetValue(inField.Offset + inStartPos, out fieldVal);
if (fieldFound)
{
- // Replace field with new value.
- if (fieldVal != PortableBuilderField.RmvMarker)
- WriteField(ctx, fieldVal);
+ WriteField(ctx, fieldVal);
- vals.Remove(inFieldId);
+ vals.Remove(inField.Id);
}
else
{
- // If field was requested earlier, then we must write tracked value
- if (_parent._cache != null && _parent._cache.TryGetValue(inFieldDataPos, out fieldVal))
- WriteField(ctx, fieldVal);
- else
- // Field is not tracked, re-write as is.
- Mutate0(ctx, inStream, outStream, false, 0, EmptyVals);
- }
-
- int fieldEndPos = outStream.Position;
+ // Field is not tracked, re-write as is.
+ inStream.Seek(inField.Offset + inStartPos, SeekOrigin.Begin);
- outStream.Seek(fieldLenPos, SeekOrigin.Begin);
- outStream.WriteInt(fieldEndPos - fieldLenPos - 4);
- outStream.Seek(fieldEndPos, SeekOrigin.Begin);
+ Mutate0(ctx, inStream, outStream, false, 0, EmptyVals);
+ }
}
-
- // Position intput stream pointer after the field.
- inStream.Seek(inFieldDataPos + inFieldLen, SeekOrigin.Begin);
}
// Write remaining new fields.
foreach (var valEntry in vals)
{
- if (valEntry.Value != PortableBuilderField.RmvMarker)
- {
- outStream.WriteInt(valEntry.Key);
+ if (valEntry.Value == PortableBuilderField.RmvMarker)
+ continue;
+
+ // ReSharper disable once PossibleNullReferenceException (can't be null)
+ outSchema.Add(new PortableObjectSchemaField(valEntry.Key, outStream.Position - outStartPos));
+
+ WriteField(ctx, valEntry.Value);
+ }
- int fieldLenPos = outStream.Position; // Here we will write length later.
+ if (outSchema != null && outSchema.Count == 0)
+ outSchema = null;
- outStream.Seek(4, SeekOrigin.Current);
+ // Write raw data.
+ int outRawOff = outStream.Position - outStartPos;
- WriteField(ctx, valEntry.Value);
+ int inRawOff = inHeader.GetRawOffset(inStream, inStartPos);
+ int inRawLen = inHeader.SchemaOffset - inRawOff;
- int fieldEndPos = outStream.Position;
+ if (inRawLen > 0)
+ outStream.Write(inStream.InternalArray, inStartPos + inRawOff, inRawLen);
- outStream.Seek(fieldLenPos, SeekOrigin.Begin);
- outStream.WriteInt(fieldEndPos - fieldLenPos - 4);
- outStream.Seek(fieldEndPos, SeekOrigin.Begin);
- }
+ // Write schema
+ int outSchemaOff = outRawOff;
+
+ if (outSchema != null)
+ {
+ outSchemaOff = outStream.Position - outStartPos;
+
+ PortableObjectSchemaField.WriteArray(outSchema.Array, outStream, outSchema.Count);
+
+ if (inRawLen > 0)
+ outStream.WriteInt(outRawOff);
}
- // Write raw data.
- int rawPos = outStream.Position;
+ var outSchemaId = PortableUtils.GetSchemaId(outSchema);
- outStream.Write(inStream.InternalArray, inStartPos + inRawOff, inLen - inRawOff);
+ var outLen = outStream.Position - outStartPos;
- // Write length and raw data offset.
- int outResPos = outStream.Position;
+ var outHash = changeHash ? hash : inHeader.HashCode;
- outStream.Seek(outStartPos + PortableUtils.OffsetLen, SeekOrigin.Begin);
+ var outHeader = new PortableObjectHeader(inHeader.IsUserType, inHeader.TypeId, outHash,
+ outLen, outSchemaId, outSchemaOff, outSchema == null);
- outStream.WriteInt(outResPos - outStartPos); // Length.
- outStream.WriteInt(rawPos - outStartPos); // Raw offset.
+ PortableObjectHeader.Write(outHeader, outStream, outStartPos);
- outStream.Seek(outResPos, SeekOrigin.Begin);
+ outStream.Seek(outStartPos + outLen, SeekOrigin.Begin); // seek to the end of the object
}
}
else
@@ -728,7 +727,7 @@ namespace Apache.Ignite.Core.Impl.Portable
}
// Synchronize input stream position.
- inStream.Seek(inStartPos + inLen, SeekOrigin.Begin);
+ inStream.Seek(inStartPos + inHeader.Length, SeekOrigin.Begin);
}
else
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableFullTypeDescriptor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableFullTypeDescriptor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableFullTypeDescriptor.cs
index 312cefa..8695a3e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableFullTypeDescriptor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableFullTypeDescriptor.cs
@@ -63,6 +63,9 @@ namespace Apache.Ignite.Core.Impl.Portable
/** Type structure. */
private volatile PortableStructure _readerTypeStructure = PortableStructure.CreateEmpty();
+
+ /** Type schema. */
+ private readonly PortableObjectSchema _schema = new PortableObjectSchema();
/// <summary>
/// Constructor.
@@ -212,5 +215,11 @@ namespace Apache.Ignite.Core.Impl.Portable
_readerTypeStructure = _readerTypeStructure.Merge(exp, pathIdx, updates);
}
}
+
+ /** <inheritDoc /> */
+ public PortableObjectSchema Schema
+ {
+ get { return _schema; }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
index a8d7058..5ea7a55 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs
@@ -119,7 +119,7 @@ namespace Apache.Ignite.Core.Impl.Portable
Marshal(val, stream);
- return stream.ArrayCopy();
+ return stream.GetArrayCopy();
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectHeader.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectHeader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectHeader.cs
new file mode 100644
index 0000000..b3768a0
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectHeader.cs
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Portable
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.IO;
+ using System.Runtime.InteropServices;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+
+ /// <summary>
+ /// Portable object header structure.
+ /// </summary>
+ [StructLayout(LayoutKind.Sequential)]
+ internal struct PortableObjectHeader : IEquatable<PortableObjectHeader>
+ {
+ /** Size, equals to sizeof(PortableObjectHeader) */
+ public const int Size = 24;
+
+ /** User type flag */
+ private const int FlagUserType = 0x1;
+
+ /** Raw only flag */
+ private const int FlagRawOnly = 0x2;
+
+ /** Actual header layout */
+ public readonly byte Header; // Header code, always 103 (HdrFull)
+ public readonly byte Version; // Protocol version
+ public readonly short Flags; // Flags
+ public readonly int TypeId; // Type ID
+ public readonly int HashCode; // Hash code
+ public readonly int Length; // Length, including header
+ public readonly int SchemaId; // Schema ID (Fnv1 of field type ids)
+ public readonly int SchemaOffset; // Schema offset, or raw offset when RawOnly flag is set.
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="PortableObjectHeader"/> struct.
+ /// </summary>
+ /// <param name="userType">User type flag.</param>
+ /// <param name="typeId">Type ID.</param>
+ /// <param name="hashCode">Hash code.</param>
+ /// <param name="length">Length.</param>
+ /// <param name="schemaId">Schema ID.</param>
+ /// <param name="schemaOffset">Schema offset.</param>
+ /// <param name="rawOnly">Raw flag.</param>
+ public PortableObjectHeader(bool userType, int typeId, int hashCode, int length, int schemaId, int schemaOffset, bool rawOnly)
+ {
+ Header = PortableUtils.HdrFull;
+ Version = PortableUtils.ProtoVer;
+
+ Debug.Assert(schemaOffset <= length);
+ Debug.Assert(schemaOffset >= Size);
+
+ Flags = (short) (userType ? FlagUserType : 0);
+
+ if (rawOnly)
+ Flags = (short) (Flags | FlagRawOnly);
+
+ TypeId = typeId;
+ HashCode = hashCode;
+ Length = length;
+ SchemaId = schemaId;
+ SchemaOffset = schemaOffset;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="PortableObjectHeader"/> struct from specified stream.
+ /// </summary>
+ /// <param name="stream">The stream.</param>
+ private PortableObjectHeader(IPortableStream stream)
+ {
+ Header = stream.ReadByte();
+ Version = stream.ReadByte();
+ Flags = stream.ReadShort();
+ Length = stream.ReadInt();
+ TypeId = stream.ReadInt();
+ HashCode = stream.ReadInt();
+ SchemaId = stream.ReadInt();
+ SchemaOffset = stream.ReadInt();
+ }
+
+ /// <summary>
+ /// Writes this instance to the specified stream.
+ /// </summary>
+ /// <param name="stream">The stream.</param>
+ private void Write(IPortableStream stream)
+ {
+ stream.WriteByte(Header);
+ stream.WriteByte(Version);
+ stream.WriteShort(Flags);
+ stream.WriteInt(Length);
+ stream.WriteInt(TypeId);
+ stream.WriteInt(HashCode);
+ stream.WriteInt(SchemaId);
+ stream.WriteInt(SchemaOffset);
+ }
+
+ /// <summary>
+ /// Gets a user type flag.
+ /// </summary>
+ public bool IsUserType
+ {
+ get { return (Flags & FlagUserType) == FlagUserType; }
+ }
+
+ /// <summary>
+ /// Gets a raw-only flag.
+ /// </summary>
+ public bool IsRawOnly
+ {
+ get { return (Flags & FlagRawOnly) == FlagRawOnly; }
+ }
+
+ /// <summary>
+ /// Gets a value indicating whether this instance has raw offset.
+ /// </summary>
+ public bool HasRawOffset
+ {
+ get
+ {
+ // Odd amount of records in schema => raw offset is the very last 4 bytes in object.
+ return !IsRawOnly && (((Length - SchemaOffset) >> 2) & 0x1) != 0x0;
+ }
+ }
+
+ /// <summary>
+ /// Gets the schema field count.
+ /// </summary>
+ public int SchemaFieldCount
+ {
+ get
+ {
+ if (IsRawOnly)
+ return 0;
+
+ var schemaSize = Length - SchemaOffset;
+
+ if (HasRawOffset)
+ schemaSize -= 4;
+
+ return schemaSize >> 3; // 8 == PortableObjectSchemaField.Size
+ }
+ }
+
+ /// <summary>
+ /// Gets the schema end.
+ /// </summary>
+ public int GetSchemaEnd(int position)
+ {
+ var res = position + Length;
+
+ if (HasRawOffset)
+ res -= 4;
+
+ return res;
+ }
+
+ /// <summary>
+ /// Gets the schema start.
+ /// </summary>
+ public int GetSchemaStart(int position)
+ {
+ return IsRawOnly ? GetSchemaEnd(position) : position + SchemaOffset;
+ }
+
+ /// <summary>
+ /// Gets the raw offset of this object in specified stream.
+ /// </summary>
+ /// <param name="stream">The stream.</param>
+ /// <param name="position">The position.</param>
+ /// <returns>Raw offset.</returns>
+ public int GetRawOffset(IPortableStream stream, int position)
+ {
+ Debug.Assert(stream != null);
+
+ if (!HasRawOffset)
+ return SchemaOffset;
+
+ stream.Seek(position + Length - 4, SeekOrigin.Begin);
+
+ return stream.ReadInt();
+ }
+
+ /// <summary>
+ /// Reads the schema as dictionary according to this header data.
+ /// </summary>
+ /// <param name="stream">The stream.</param>
+ /// <param name="position">The position.</param>
+ /// <returns>Schema.</returns>
+ public Dictionary<int, int> ReadSchemaAsDictionary(IPortableStream stream, int position)
+ {
+ Debug.Assert(stream != null);
+
+ var schemaSize = SchemaFieldCount;
+
+ if (schemaSize == 0)
+ return null;
+
+ stream.Seek(position + SchemaOffset, SeekOrigin.Begin);
+
+ var schema = new Dictionary<int, int>(schemaSize >> 3);
+
+ for (var i = 0; i < schemaSize; i++)
+ schema.Add(stream.ReadInt(), stream.ReadInt());
+
+ return schema;
+ }
+
+ /// <summary>
+ /// Reads the schema according to this header data.
+ /// </summary>
+ /// <param name="stream">The stream.</param>
+ /// <param name="position">The position.</param>
+ /// <returns>Schema.</returns>
+ public PortableObjectSchemaField[] ReadSchema(IPortableStream stream, int position)
+ {
+ Debug.Assert(stream != null);
+
+ var schemaSize = SchemaFieldCount;
+
+ if (schemaSize == 0)
+ return null;
+
+ stream.Seek(position + SchemaOffset, SeekOrigin.Begin);
+
+ return PortableObjectSchemaField.ReadArray(stream, schemaSize);
+ }
+
+ /// <summary>
+ /// Writes specified header to a stream.
+ /// </summary>
+ /// <param name="header">The header.</param>
+ /// <param name="stream">The stream.</param>
+ /// <param name="position">The position.</param>
+ public static unsafe void Write(PortableObjectHeader header, IPortableStream stream, int position)
+ {
+ Debug.Assert(stream != null);
+ Debug.Assert(position >= 0);
+
+ stream.Seek(position, SeekOrigin.Begin);
+
+ if (BitConverter.IsLittleEndian)
+ stream.Write((byte*) &header, Size);
+ else
+ header.Write(stream);
+ }
+
+ /// <summary>
+ /// Reads an instance from stream.
+ /// </summary>
+ /// <param name="stream">The stream.</param>
+ /// <param name="position">The position.</param>
+ /// <returns>Instance of the header.</returns>
+ public static unsafe PortableObjectHeader Read(IPortableStream stream, int position)
+ {
+ Debug.Assert(stream != null);
+ Debug.Assert(position >= 0);
+
+ stream.Seek(position, SeekOrigin.Begin);
+
+ if (BitConverter.IsLittleEndian)
+ {
+ var hdr = new PortableObjectHeader();
+
+ stream.Read((byte*) &hdr, Size);
+
+ Debug.Assert(hdr.Version == PortableUtils.ProtoVer);
+ Debug.Assert(hdr.SchemaOffset <= hdr.Length);
+ Debug.Assert(hdr.SchemaOffset >= Size);
+
+ return hdr;
+ }
+
+ return new PortableObjectHeader(stream);
+ }
+
+ /** <inheritdoc> */
+ public bool Equals(PortableObjectHeader other)
+ {
+ return Header == other.Header &&
+ Version == other.Version &&
+ Flags == other.Flags &&
+ TypeId == other.TypeId &&
+ HashCode == other.HashCode &&
+ Length == other.Length &&
+ SchemaId == other.SchemaId &&
+ SchemaOffset == other.SchemaOffset;
+ }
+
+ /** <inheritdoc> */
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+
+ return obj is PortableObjectHeader && Equals((PortableObjectHeader) obj);
+ }
+
+ /** <inheritdoc> */
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ var hashCode = Header.GetHashCode();
+ hashCode = (hashCode*397) ^ Version.GetHashCode();
+ hashCode = (hashCode*397) ^ Flags.GetHashCode();
+ hashCode = (hashCode*397) ^ TypeId;
+ hashCode = (hashCode*397) ^ HashCode;
+ hashCode = (hashCode*397) ^ Length;
+ hashCode = (hashCode*397) ^ SchemaId;
+ hashCode = (hashCode*397) ^ SchemaOffset;
+ return hashCode;
+ }
+ }
+
+ /** <inheritdoc> */
+ public static bool operator ==(PortableObjectHeader left, PortableObjectHeader right)
+ {
+ return left.Equals(right);
+ }
+
+ /** <inheritdoc> */
+ public static bool operator !=(PortableObjectHeader left, PortableObjectHeader right)
+ {
+ return !left.Equals(right);
+ }
+ }
+}