You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/04 19:53:41 UTC

[GitHub] [arrow] HashidaTKS commented on a change in pull request #10527: ARROW-6870: [C#] Add Support for Dictionary Arrays and Dictionary Encoding

HashidaTKS commented on a change in pull request #10527:
URL: https://github.com/apache/arrow/pull/10527#discussion_r663390641



##########
File path: csharp/src/Apache.Arrow/Arrays/DictionaryArray.cs
##########
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using Apache.Arrow.Types;
+
+namespace Apache.Arrow
+{
+    public class DictionaryArray : Array
+    {
+        public IArrowArray Dictionary { get; }
+        public IArrowArray Indices { get; }
+        public ArrowBuffer IndicesBuffer => Data.Buffers[1];
+
+        public DictionaryArray(IArrowType dataType, int length,
+            ArrowBuffer valueOffsetsBuffer, IArrowArray value,
+            ArrowBuffer nullBitmapBuffer, int nullCount = 0, int offset = 0)
+            : this(new ArrayData(dataType, length, nullCount, offset,
+                new[] { nullBitmapBuffer, valueOffsetsBuffer }, new[] { value.Data }, value.Data.Dictionary))
+        {
+        }
+
+        public DictionaryArray(ArrayData data) : base(data)
+        {
+            data.EnsureBufferCount(2);
+            data.EnsureDataType(ArrowTypeId.Dictionary);
+
+            var dicType = data.DataType as DictionaryType;
+            data.Dictionary.EnsureDataType(dicType.ValueType.TypeId);

Review comment:
       Please check it again.
   I added the null check with the following commit. 
   
   https://github.com/apache/arrow/pull/10527/commits/8544064b944745aa757efe23e3c3b3aaac65e89a
   
   https://github.com/apache/arrow/pull/10527/files#diff-f31b887c0ac098b46742c1984588210d7f217a9f546e55255570a1d15d5cc3b3R33
   

##########
File path: csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
##########
@@ -68,7 +68,7 @@ public async Task Ctor_MemoryPool_AllocatesFromPool(bool shouldLeaveOpen)
                 ArrowStreamReader reader = new ArrowStreamReader(stream, memoryPool, shouldLeaveOpen);
                 reader.ReadNextRecordBatch();
 
-                Assert.Equal(1, memoryPool.Statistics.Allocations);
+                Assert.Equal(2, memoryPool.Statistics.Allocations);

Review comment:
       I fixed it to use `[Theory]`.

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
##########
@@ -43,13 +45,48 @@ protected override void Dispose(bool disposing)
             }
         }
 
+        protected void ReadInitialDictionaries()

Review comment:
       I fixed it.

##########
File path: csharp/src/Apache.Arrow/Ipc/MessageSerializer.cs
##########
@@ -52,14 +53,13 @@ public static Types.NumberType GetNumberType(int bitWidth, bool signed)
                                 $"{(signed ? "signed " : "unsigned")} integer.");
         }
 
-        internal static Schema GetSchema(Flatbuf.Schema schema)
+        internal static Schema GetSchema(Flatbuf.Schema schema, LazyCreator<DictionaryMemo> lazyDictionaryMemo)

Review comment:
       I fixed it.

##########
File path: csharp/test/Apache.Arrow.Tests/TestData.cs
##########
@@ -21,12 +21,13 @@ namespace Apache.Arrow.Tests
 {
     public static class TestData
     {
-        public static RecordBatch CreateSampleRecordBatch(int length)
+        //TODO: Remove the createDictionaryArray argument after all writer/reader supports DictionaryType serialization

Review comment:
       I was planning to always create a dictionary array after `ArrowFileWriter/Reader` support `DictionaryType` serialization.
   But considering your feedback comments, now I think it is better to leave `createDictionaryArray` even after that.
   I will remove this `TODO`.

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
##########
@@ -43,13 +45,48 @@ protected override void Dispose(bool disposing)
             }
         }
 
+        protected void ReadInitialDictionaries()
+        {
+            if (HasReadInitialDictionary)
+            {
+                return;
+            }
+
+            if (_lazyDictionaryMemo.IsCreated) {
+                int fieldCount = _lazyDictionaryMemo.Instance.GetFieldCount();
+                for (int i = 0; i < fieldCount; ++i)
+                {
+                    ReadArrowObject();
+                }
+            }
+            HasReadInitialDictionary = true;
+        }
+
+        protected async ValueTask ReadInitialDictionariesAsync(CancellationToken cancellationToken = default)
+        {
+            if (HasReadInitialDictionary)
+            {
+                return;
+            }
+
+            int fieldCount = _lazyDictionaryMemo.Instance.GetFieldCount();
+            for (int i = 0; i < fieldCount; ++i)
+            {
+                await ReadArrowObjectAsync(cancellationToken).ConfigureAwait(false);
+            }
+
+            HasReadInitialDictionary = true;
+        }
+
         public override async ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken)
         {
             // TODO: Loop until a record batch is read.
             cancellationToken.ThrowIfCancellationRequested();
             return await ReadRecordBatchAsync(cancellationToken).ConfigureAwait(false);
         }
 
+

Review comment:
       I fixed it.

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
##########
@@ -27,8 +27,10 @@ internal class ArrowStreamReaderImplementation : ArrowReaderImplementation
         public Stream BaseStream { get; }
         private readonly bool _leaveOpen;
         private readonly MemoryAllocator _allocator;
+        private protected bool HasReadInitialDictionary { get; set; }
 
-        public ArrowStreamReaderImplementation(Stream stream, MemoryAllocator allocator, bool leaveOpen)
+
+        public ArrowStreamReaderImplementation(Stream stream, MemoryAllocator allocator, bool leaveOpen) : base()

Review comment:
       I fixed it.

##########
File path: csharp/src/Apache.Arrow/RecordBatch.cs
##########
@@ -28,6 +28,8 @@ public partial class RecordBatch : IDisposable
         public IEnumerable<IArrowArray> Arrays => _arrays;
         public int Length { get; }
 
+        internal IReadOnlyList<IArrowArray> _Arrays => (IReadOnlyList<IArrowArray>)_arrays;
+
         private readonly IMemoryOwner<byte> _memoryOwner;
         private readonly IList<IArrowArray> _arrays;

Review comment:
       I fixed it.

##########
File path: csharp/src/Apache.Arrow/Ipc/MessageSerializer.cs
##########
@@ -73,13 +73,27 @@ internal static Schema GetSchema(Flatbuf.Schema schema)
             return new Schema(fields, metadata, copyCollections: false);
         }
 
-        private static Field FieldFromFlatbuffer(Flatbuf.Field flatbufField)
+        private static Field FieldFromFlatbuffer(Flatbuf.Field flatbufField, LazyCreator<DictionaryMemo> lazyDictionaryMemo)
         {
             Field[] childFields = flatbufField.ChildrenLength > 0 ? new Field[flatbufField.ChildrenLength] : null;
             for (int i = 0; i < flatbufField.ChildrenLength; i++)
             {
                 Flatbuf.Field? childFlatbufField = flatbufField.Children(i);
-                childFields[i] = FieldFromFlatbuffer(childFlatbufField.Value);
+                childFields[i] = FieldFromFlatbuffer(childFlatbufField.Value, lazyDictionaryMemo);
+            }
+
+            Flatbuf.DictionaryEncoding? de = flatbufField.Dictionary;

Review comment:
       I fixed it.

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
##########
@@ -27,8 +27,10 @@ internal class ArrowStreamReaderImplementation : ArrowReaderImplementation
         public Stream BaseStream { get; }
         private readonly bool _leaveOpen;
         private readonly MemoryAllocator _allocator;
+        private protected bool HasReadInitialDictionary { get; set; }

Review comment:
       I fixed it.

##########
File path: csharp/src/Apache.Arrow/Ipc/DictionaryMemo.cs
##########
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using Apache.Arrow.Types;
+
+namespace Apache.Arrow.Ipc
+{
+    class DictionaryMemo
+    {
+        private Dictionary<long, IArrowArray> _idToDictionary;
+        private Dictionary<long, IArrowType> _idToValueType;
+        private Dictionary<Field, long> _fieldToId;
+
+        public DictionaryMemo()
+        {
+            _idToDictionary = new Dictionary<long, IArrowArray>();
+            _idToValueType = new Dictionary<long, IArrowType>();
+            _fieldToId = new Dictionary<Field, long>();
+        }
+
+        public IArrowType GetDictionaryType(long id)
+        {
+            if (!_idToValueType.TryGetValue(id, out IArrowType type))
+            {
+                throw new ArgumentException($"Dictionary with id {id} not found");
+            }
+            return type;
+        }
+
+        public IArrowArray GetDictionary(long id)
+        {
+            if (!_idToDictionary.TryGetValue(id, out IArrowArray dictionary))
+            {
+                throw new ArgumentException($"Dictionary with id {id} not found");
+            }
+            return dictionary;
+        }
+
+        public void AddField(long id, Field field)
+        {
+            if (_fieldToId.ContainsKey(field))
+            {
+                throw new ArgumentException($"Field {field.Name} is already in Memo");
+            }
+
+            if (field.DataType.TypeId != ArrowTypeId.Dictionary)
+            {
+                throw new ArgumentException($"Field type is not DictionaryType: Name={field.Name}, {field.DataType.Name}");
+            }
+
+            IArrowType valueType = ((DictionaryType)field.DataType).ValueType;
+
+            if (_idToValueType.TryGetValue(id, out IArrowType valueTypeInDic))
+            {
+                if (valueType != valueTypeInDic)
+                {
+                    throw new ArgumentException($"Field type {field.DataType.Name} does not match the existing type {valueTypeInDic})");
+                }
+            }
+
+            _fieldToId.Add(field, id);
+            _idToValueType.Add(id, valueType);
+        }
+
+        public long GetId(Field field)
+        {
+            if (!_fieldToId.TryGetValue(field, out long id))
+            {
+                throw new ArgumentException($"Field with name {field.Name} not found");
+            }
+            return id;
+        }
+
+        public long GetOrAssignId(Field field)
+        {
+            if (!_fieldToId.TryGetValue(field, out long id))
+            {
+                id = _fieldToId.Count + 1;
+                AddField(id, field);
+            }
+            return id;
+        }
+
+        public void AddOrReplaceDictionary(long id, IArrowArray dictionary)
+        {
+            _idToDictionary[id] = dictionary;
+        }
+
+        public void AddDictionaryDelta(long id, IArrowArray dictionary)

Review comment:
       Sorry, I didn't commit the change.

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
##########
@@ -43,13 +45,48 @@ protected override void Dispose(bool disposing)
             }
         }
 
+        protected void ReadInitialDictionaries()
+        {
+            if (HasReadInitialDictionary)
+            {
+                return;
+            }
+
+            if (_lazyDictionaryMemo.IsCreated) {
+                int fieldCount = _lazyDictionaryMemo.Instance.GetFieldCount();
+                for (int i = 0; i < fieldCount; ++i)
+                {
+                    ReadArrowObject();
+                }
+            }
+            HasReadInitialDictionary = true;
+        }
+
+        protected async ValueTask ReadInitialDictionariesAsync(CancellationToken cancellationToken = default)

Review comment:
       I fixed it.

##########
File path: csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs
##########
@@ -32,7 +32,7 @@ public class ArrowStreamWriterTests
         [Fact]
         public void Ctor_LeaveOpenDefault_StreamClosedOnDispose()
         {
-            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100, createDictionaryArray: true);

Review comment:
       I fixed it.

##########
File path: csharp/test/Apache.Arrow.Tests/TestData.cs
##########
@@ -21,12 +21,13 @@ namespace Apache.Arrow.Tests
 {
     public static class TestData
     {
-        public static RecordBatch CreateSampleRecordBatch(int length)
+        //TODO: Remove the createDictionaryArray argument after all writer/reader supports DictionaryType serialization

Review comment:
       I removed  the `TODO` comment.
   

##########
File path: csharp/test/Apache.Arrow.Tests/TestData.cs
##########
@@ -21,12 +21,13 @@ namespace Apache.Arrow.Tests
 {
     public static class TestData
     {
-        public static RecordBatch CreateSampleRecordBatch(int length)
+        //TODO: Remove the createDictionaryArray argument after all writer/reader supports DictionaryType serialization
+        public static RecordBatch CreateSampleRecordBatch(int length, bool createDictionaryArray = false)

Review comment:
       I understand.
   I fixed tests to use `[Theory]`

##########
File path: csharp/src/Apache.Arrow/Ipc/MessageSerializer.cs
##########
@@ -73,13 +73,27 @@ internal static Schema GetSchema(Flatbuf.Schema schema)
             return new Schema(fields, metadata, copyCollections: false);
         }
 
-        private static Field FieldFromFlatbuffer(Flatbuf.Field flatbufField)
+        private static Field FieldFromFlatbuffer(Flatbuf.Field flatbufField, LazyCreator<DictionaryMemo> lazyDictionaryMemo)
         {
             Field[] childFields = flatbufField.ChildrenLength > 0 ? new Field[flatbufField.ChildrenLength] : null;
             for (int i = 0; i < flatbufField.ChildrenLength; i++)
             {
                 Flatbuf.Field? childFlatbufField = flatbufField.Children(i);
-                childFields[i] = FieldFromFlatbuffer(childFlatbufField.Value);
+                childFields[i] = FieldFromFlatbuffer(childFlatbufField.Value, lazyDictionaryMemo);
+            }
+
+            Flatbuf.DictionaryEncoding? de = flatbufField.Dictionary;
+            IArrowType type = GetFieldArrowType(flatbufField, childFields);
+
+            if (de.HasValue)
+            {
+                Flatbuf.Int? indexTypeAsInt = de.Value.IndexType;
+                if (!indexTypeAsInt.HasValue)
+                {
+                    throw new InvalidDataException("Dictionary type not defined");

Review comment:
       I fixed it.

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
##########
@@ -59,6 +96,66 @@ protected async ValueTask<RecordBatch> ReadRecordBatchAsync(CancellationToken ca
         {
             await ReadSchemaAsync().ConfigureAwait(false);
 
+            await ReadInitialDictionariesAsync().ConfigureAwait(false);
+
+            return await ReadArrowObjectAsync().ConfigureAwait(false);

Review comment:
       I fixed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org