You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/09 15:17:48 UTC

[4/6] ignite git commit: IGNITE-2702: .NET: Implemented compact footers optimization for binary serialization. This closes #523.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectSchemaSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectSchemaSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectSchemaSerializer.cs
new file mode 100644
index 0000000..da86c07
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectSchemaSerializer.cs
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Binary
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.IO;
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Impl.Binary.IO;
+
+    /// <summary>
+    /// Schema reader/writer.
+    /// </summary>
+    internal static class BinaryObjectSchemaSerializer
+    {
+        /// <summary>
+        /// Converts schema fields to dictionary.
+        /// </summary>
+        /// <param name="fields">The fields.</param>
+        /// <returns>Fields as dictionary.</returns>
+        public static Dictionary<int, int> ToDictionary(this BinaryObjectSchemaField[] fields)
+        {
+            if (fields == null)
+                return null;
+
+            var res = new Dictionary<int, int>(fields.Length);
+
+            foreach (var field in fields)
+                res[field.Id] = field.Offset;
+
+            return res;
+        }
+
+        /// <summary>
+        /// Reads the schema according to this header data.
+        /// </summary>
+        /// <param name="stream">The stream.</param>
+        /// <param name="position">The position.</param>
+        /// <param name="hdr">The header.</param>
+        /// <param name="schema">The schema.</param>
+        /// <param name="marsh">The marshaller.</param>
+        /// <returns>
+        /// Schema.
+        /// </returns>
+        public static BinaryObjectSchemaField[] ReadSchema(IBinaryStream stream, int position, BinaryObjectHeader hdr, 
+            BinaryObjectSchema schema, Marshaller marsh)
+        {
+            Debug.Assert(stream != null);
+            Debug.Assert(schema != null);
+            Debug.Assert(marsh != null);
+
+            return ReadSchema(stream, position, hdr, () => GetFieldIds(hdr, schema, marsh));
+        }
+
+        /// <summary>
+        /// Reads the schema according to this header data.
+        /// </summary>
+        /// <param name="stream">The stream.</param>
+        /// <param name="position">The position.</param>
+        /// <param name="hdr">The header.</param>
+        /// <param name="fieldIdsFunc">The field ids function.</param>
+        /// <returns>
+        /// Schema.
+        /// </returns>
+        public static BinaryObjectSchemaField[] ReadSchema(IBinaryStream stream, int position, BinaryObjectHeader hdr, 
+            Func<int[]> fieldIdsFunc)
+        {
+            Debug.Assert(stream != null);
+            Debug.Assert(fieldIdsFunc != null);
+
+            var schemaSize = hdr.SchemaFieldCount;
+
+            if (schemaSize == 0)
+                return null;
+
+            stream.Seek(position + hdr.SchemaOffset, SeekOrigin.Begin);
+
+            var res = new BinaryObjectSchemaField[schemaSize];
+
+            var offsetSize = hdr.SchemaFieldOffsetSize;
+
+            if (hdr.IsCompactFooter)
+            {
+                var fieldIds = fieldIdsFunc();
+
+                Debug.Assert(fieldIds.Length == schemaSize);
+
+                if (offsetSize == 1)
+                {
+                    for (var i = 0; i < schemaSize; i++)
+                        res[i] = new BinaryObjectSchemaField(fieldIds[i], stream.ReadByte());
+
+                }
+                else if (offsetSize == 2)
+                {
+                    for (var i = 0; i < schemaSize; i++)
+                        res[i] = new BinaryObjectSchemaField(fieldIds[i], stream.ReadShort());
+                }
+                else
+                {
+                    for (var i = 0; i < schemaSize; i++)
+                        res[i] = new BinaryObjectSchemaField(fieldIds[i], stream.ReadInt());
+                }
+            }
+            else
+            {
+                if (offsetSize == 1)
+                {
+                    for (var i = 0; i < schemaSize; i++)
+                        res[i] = new BinaryObjectSchemaField(stream.ReadInt(), stream.ReadByte());
+                }
+                else if (offsetSize == 2)
+                {
+                    for (var i = 0; i < schemaSize; i++)
+                        res[i] = new BinaryObjectSchemaField(stream.ReadInt(), stream.ReadShort());
+                }
+                else
+                {
+                    for (var i = 0; i < schemaSize; i++)
+                        res[i] = new BinaryObjectSchemaField(stream.ReadInt(), stream.ReadInt());
+                }
+            }
+
+            return res;
+        }
+
+        /// <summary>
+        /// Writes an array of fields to a stream.
+        /// </summary>
+        /// <param name="fields">Fields.</param>
+        /// <param name="stream">Stream.</param>
+        /// <param name="offset">Offset in the array.</param>
+        /// <param name="count">Field count to write.</param>
+        /// <param name="compact">Compact mode without field ids.</param>
+        /// <returns>
+        /// Flags according to offset sizes: <see cref="BinaryObjectHeader.Flag.OffsetOneByte" />,
+        /// <see cref="BinaryObjectHeader.Flag.OffsetTwoBytes" />, or 0.
+        /// </returns>
+        public static unsafe BinaryObjectHeader.Flag WriteSchema(BinaryObjectSchemaField[] fields, IBinaryStream stream, int offset,
+            int count, bool compact)
+        {
+            Debug.Assert(fields != null);
+            Debug.Assert(stream != null);
+            Debug.Assert(count > 0);
+            Debug.Assert(offset >= 0);
+            Debug.Assert(offset < fields.Length);
+
+            unchecked
+            {
+                // Last field is the farthest in the stream
+                var maxFieldOffset = fields[offset + count - 1].Offset;
+
+                if (compact)
+                {
+                    if (maxFieldOffset <= byte.MaxValue)
+                    {
+                        for (int i = offset; i < count + offset; i++)
+                            stream.WriteByte((byte)fields[i].Offset);
+
+                        return BinaryObjectHeader.Flag.OffsetOneByte;
+                    }
+
+                    if (maxFieldOffset <= ushort.MaxValue)
+                    {
+                        for (int i = offset; i < count + offset; i++)
+                            stream.WriteShort((short)fields[i].Offset);
+
+                        return BinaryObjectHeader.Flag.OffsetTwoBytes;
+                    }
+
+                    for (int i = offset; i < count + offset; i++)
+                        stream.WriteInt(fields[i].Offset);
+                }
+                else
+                {
+                    if (maxFieldOffset <= byte.MaxValue)
+                    {
+                        for (int i = offset; i < count + offset; i++)
+                        {
+                            var field = fields[i];
+
+                            stream.WriteInt(field.Id);
+                            stream.WriteByte((byte)field.Offset);
+                        }
+
+                        return BinaryObjectHeader.Flag.OffsetOneByte;
+                    }
+
+                    if (maxFieldOffset <= ushort.MaxValue)
+                    {
+                        for (int i = offset; i < count + offset; i++)
+                        {
+                            var field = fields[i];
+
+                            stream.WriteInt(field.Id);
+
+                            stream.WriteShort((short)field.Offset);
+                        }
+
+                        return BinaryObjectHeader.Flag.OffsetTwoBytes;
+                    }
+
+                    if (BitConverter.IsLittleEndian)
+                    {
+                        fixed (BinaryObjectSchemaField* ptr = &fields[offset])
+                        {
+                            stream.Write((byte*)ptr, count / BinaryObjectSchemaField.Size);
+                        }
+                    }
+                    else
+                    {
+                        for (int i = offset; i < count + offset; i++)
+                        {
+                            var field = fields[i];
+
+                            stream.WriteInt(field.Id);
+                            stream.WriteInt(field.Offset);
+                        }
+                    }
+                }
+
+
+                return BinaryObjectHeader.Flag.None;
+            }
+        }
+
+        /// <summary>
+        /// Gets the field ids.
+        /// </summary>
+        private static int[] GetFieldIds(BinaryObjectHeader hdr, BinaryObjectSchema schema, Marshaller marsh)
+        {
+            var fieldIds = schema.Get(hdr.SchemaId);
+
+            if (fieldIds == null)
+            {
+                if (marsh.Ignite != null)
+                    fieldIds = marsh.Ignite.ClusterGroup.GetSchema(hdr.TypeId, hdr.SchemaId);
+
+                if (fieldIds == null)
+                    throw new BinaryObjectException("Cannot find schema for object with compact footer [" +
+                                                    "typeId=" + hdr.TypeId + ", schemaId=" + hdr.SchemaId + ']');
+            }
+            return fieldIds;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
index 16aae93..21c1642 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
@@ -718,16 +718,7 @@ namespace Apache.Ignite.Core.Impl.Binary
                     // Set new frame.
                     _curHdr = hdr;
                     _curPos = pos;
-                    
-                    _curSchema = desc.Schema.Get(hdr.SchemaId);
-
-                    if (_curSchema == null)
-                    {
-                        _curSchema = ReadSchema();
-
-                        desc.Schema.Add(hdr.SchemaId, _curSchema);
-                    }
-
+                    SetCurSchema(desc);
                     _curStruct = new BinaryStructureTracker(desc, desc.ReaderTypeStructure);
                     _curRaw = false;
 
@@ -790,10 +781,40 @@ namespace Apache.Ignite.Core.Impl.Binary
         }
 
         /// <summary>
+        /// Sets the current schema.
+        /// </summary>
+        private void SetCurSchema(IBinaryTypeDescriptor desc)
+        {
+            if (_curHdr.HasSchema)
+            {
+                _curSchema = desc.Schema.Get(_curHdr.SchemaId);
+
+                if (_curSchema == null)
+                {
+                    _curSchema = ReadSchema();
+
+                    desc.Schema.Add(_curHdr.SchemaId, _curSchema);
+                }
+            }
+        }
+
+        /// <summary>
         /// Reads the schema.
         /// </summary>
         private int[] ReadSchema()
         {
+            if (_curHdr.IsCompactFooter)
+            {
+                // Get schema from Java
+                var schema = Marshaller.Ignite.ClusterGroup.GetSchema(_curHdr.TypeId, _curHdr.SchemaId);
+
+                if (schema == null)
+                    throw new BinaryObjectException("Cannot find schema for object with compact footer [" +
+                        "typeId=" + _curHdr.TypeId + ", schemaId=" + _curHdr.SchemaId + ']');
+
+                return schema;
+            }
+
             Stream.Seek(_curPos + _curHdr.SchemaOffset, SeekOrigin.Begin);
 
             var count = _curHdr.SchemaFieldCount;
@@ -929,9 +950,10 @@ namespace Apache.Ignite.Core.Impl.Binary
 
             if (_curSchema == null || actionId >= _curSchema.Length || fieldId != _curSchema[actionId])
             {
-                _curSchema = null; // read order is different, ignore schema for future reads
+                _curSchemaMap = _curSchemaMap ?? BinaryObjectSchemaSerializer.ReadSchema(Stream, _curPos, _curHdr,
+                                    () => _curSchema).ToDictionary();
 
-                _curSchemaMap = _curSchemaMap ?? _curHdr.ReadSchemaAsDictionary(Stream, _curPos);
+                _curSchema = null; // read order is different, ignore schema for future reads
 
                 int pos;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
index 5b1273e..47bc2b6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
@@ -1137,6 +1137,9 @@ namespace Apache.Ignite.Core.Impl.Binary
                         ? BinaryObjectHeader.Flag.UserType
                         : BinaryObjectHeader.Flag.None;
 
+                    if (Marshaller.CompactFooter && desc.UserType)
+                        flags |= BinaryObjectHeader.Flag.CompactFooter;
+
                     var hasSchema = _schema.WriteSchema(_stream, schemaIdx, out schemaId, ref flags);
 
                     if (hasSchema)
@@ -1146,6 +1149,10 @@ namespace Apache.Ignite.Core.Impl.Binary
                         // Calculate and write header.
                         if (_curRawPos > 0)
                             _stream.WriteInt(_curRawPos - pos); // raw offset is in the last 4 bytes
+
+                        // Update schema in type descriptor
+                        if (desc.Schema.Get(schemaId) == null)
+                            desc.Schema.Add(schemaId, _schema.GetSchema(schemaIdx));
                     }
                     else
                         schemaOffset = BinaryObjectHeader.Size;
@@ -1451,18 +1458,7 @@ namespace Apache.Ignite.Core.Impl.Binary
                 BinaryType meta;
 
                 if (_metas.TryGetValue(desc.TypeId, out meta))
-                {
-                    if (fields != null)
-                    {
-                        IDictionary<string, int> existingFields = meta.GetFieldsMap();
-
-                        foreach (KeyValuePair<string, int> field in fields)
-                        {
-                            if (!existingFields.ContainsKey(field.Key))
-                                existingFields[field.Key] = field.Value;
-                        }
-                    }
-                }
+                    meta.UpdateFields(fields);
                 else
                     _metas[desc.TypeId] = new BinaryType(desc, fields);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
index 538fbcf..1a01f2c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
@@ -66,6 +66,8 @@ namespace Apache.Ignite.Core.Impl.Binary
             if (cfg == null)
                 cfg = new BinaryConfiguration();
 
+            CompactFooter = cfg.CompactFooter;
+
             if (cfg.TypeConfigurations == null)
                 cfg.TypeConfigurations = new List<BinaryTypeConfiguration>();
 
@@ -107,6 +109,11 @@ namespace Apache.Ignite.Core.Impl.Binary
         public Ignite Ignite { get; set; }
 
         /// <summary>
+        /// Gets the compact footer flag.
+        /// </summary>
+        public bool CompactFooter { get; set; }
+
+        /// <summary>
         /// Marshal object.
         /// </summary>
         /// <param name="val">Value.</param>
@@ -281,15 +288,14 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// Puts the binary type metadata to Ignite.
         /// </summary>
         /// <param name="desc">Descriptor.</param>
-        /// <param name="fields">Fields.</param>
-        public void PutBinaryType(IBinaryTypeDescriptor desc, IDictionary<string, int> fields = null)
+        public void PutBinaryType(IBinaryTypeDescriptor desc)
         {
             Debug.Assert(desc != null);
 
             GetBinaryTypeHandler(desc);  // ensure that handler exists
 
             if (Ignite != null)
-                Ignite.PutBinaryTypes(new[] {new BinaryType(desc, fields)});
+                Ignite.PutBinaryTypes(new[] {new BinaryType(desc)});
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs
index 28dfb1a..cb0d3cd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs
@@ -18,6 +18,7 @@
 namespace Apache.Ignite.Core.Impl.Binary.Metadata
 {
     using System.Collections.Generic;
+    using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
     using Apache.Ignite.Core.Binary;
 
@@ -54,6 +55,9 @@ namespace Apache.Ignite.Core.Impl.Binary.Metadata
         /** Aff key field name. */
         private readonly string _affinityKeyFieldName;
 
+        /** Type descriptor. */
+        private readonly IBinaryTypeDescriptor _descriptor;
+
         /// <summary>
         /// Initializes the <see cref="BinaryType"/> class.
         /// </summary>
@@ -129,7 +133,7 @@ namespace Apache.Ignite.Core.Impl.Binary.Metadata
         public BinaryType(IBinaryTypeDescriptor desc, IDictionary<string, int> fields = null) 
             : this (desc.TypeId, desc.TypeName, fields, desc.AffinityKeyFieldName, desc.IsEnum)
         {
-            // No-op.
+            _descriptor = desc;
         }
 
         /// <summary>
@@ -211,6 +215,14 @@ namespace Apache.Ignite.Core.Impl.Binary.Metadata
         }
 
         /// <summary>
+        /// Gets the descriptor.
+        /// </summary>
+        public IBinaryTypeDescriptor Descriptor
+        {
+            get { return _descriptor; }
+        }
+
+        /// <summary>
         /// Gets fields map.
         /// </summary>
         /// <returns>Fields map.</returns>
@@ -218,5 +230,19 @@ namespace Apache.Ignite.Core.Impl.Binary.Metadata
         {
             return _fields ?? EmptyDict;
         }
+
+        /// <summary>
+        /// Updates the fields.
+        /// </summary>
+        public void UpdateFields(IDictionary<string, int> fields)
+        {
+            if (fields == null || fields.Count == 0)
+                return;
+
+            Debug.Assert(_fields != null);
+
+            foreach (var field in fields)
+                _fields[field.Key] = field.Value;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
index fc673a6..e6c0005 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
@@ -96,6 +96,9 @@ namespace Apache.Ignite.Core.Impl.Cluster
         /** */
         private const int OpTopology = 14;
 
+        /** */
+        private const int OpSchema = 15;
+
         /** Initial Ignite instance. */
         private readonly Ignite _ignite;
         
@@ -570,5 +573,17 @@ namespace Apache.Ignite.Core.Impl.Cluster
                 return res;
             });
         }
+
+        /// <summary>
+        /// Gets the schema.
+        /// </summary>
+        public int[] GetSchema(int typeId, int schemaId)
+        {
+            return DoOutInOp<int[]>(OpSchema, writer =>
+            {
+                writer.WriteInt(typeId);
+                writer.WriteInt(schemaId);
+            });
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
index 0271fa2..1735fb8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+#pragma warning disable 618   // SpringConfigUrl
 namespace Apache.Ignite.Core.Impl
 {
     using System;
@@ -135,6 +136,24 @@ namespace Apache.Ignite.Core.Impl
 
             // Set reconnected task to completed state for convenience.
             _clientReconnectTaskCompletionSource.SetResult(false);
+
+            SetCompactFooter();
+        }
+
+        /// <summary>
+        /// Sets the compact footer setting.
+        /// </summary>
+        private void SetCompactFooter()
+        {
+            if (!string.IsNullOrEmpty(_cfg.SpringConfigUrl))
+            {
+                // If there is a Spring config, use setting from Spring, 
+                // since we ignore .NET config in legacy mode.
+                var cfg0 = GetConfiguration().BinaryConfiguration;
+
+                if (cfg0 != null)
+                    _marsh.CompactFooter = cfg0.CompactFooter;
+            }
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index c4258bd..26b6033 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@ -604,32 +604,53 @@ namespace Apache.Ignite.Core.Impl
         {
             DoOutOp(OpMeta, stream =>
             {
-                BinaryWriter metaWriter = _marsh.StartMarshal(stream);
+                BinaryWriter w = _marsh.StartMarshal(stream);
 
-                metaWriter.WriteInt(types.Count);
+                w.WriteInt(types.Count);
 
                 foreach (var meta in types)
                 {
-                    BinaryType meta0 = meta;
+                    w.WriteInt(meta.TypeId);
+                    w.WriteString(meta.TypeName);
+                    w.WriteString(meta.AffinityKeyFieldName);
 
-                    metaWriter.WriteInt(meta0.TypeId);
-                    metaWriter.WriteString(meta0.TypeName);
-                    metaWriter.WriteString(meta0.AffinityKeyFieldName);
+                    IDictionary<string, int> fields = meta.GetFieldsMap();
 
-                    IDictionary<string, int> fields = meta0.GetFieldsMap();
-
-                    metaWriter.WriteInt(fields.Count);
+                    w.WriteInt(fields.Count);
 
                     foreach (var field in fields)
                     {
-                        metaWriter.WriteString(field.Key);
-                        metaWriter.WriteInt(field.Value);
+                        w.WriteString(field.Key);
+                        w.WriteInt(field.Value);
+                    }
+
+                    w.WriteBoolean(meta.IsEnum);
+
+                    // Send schemas
+                    var desc = meta.Descriptor;
+                    Debug.Assert(desc != null);
+
+                    var count = 0;
+                    var countPos = stream.Position;
+                    w.WriteInt(0);  // Reserve for count
+
+                    foreach (var schema in desc.Schema.GetAll())
+                    {
+                        w.WriteInt(schema.Key);
+
+                        var ids = schema.Value;
+                        w.WriteInt(ids.Length);
+
+                        foreach (var id in ids)
+                            w.WriteInt(id);
+
+                        count++;
                     }
 
-                    metaWriter.WriteBoolean(meta.IsEnum);
+                    stream.WriteInt(countPos, count);
                 }
 
-                _marsh.FinishMarshal(metaWriter);
+                _marsh.FinishMarshal(w);
             });
 
             _marsh.OnBinaryTypesSent(types);