You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/07/13 17:02:27 UTC

[arrow] 32/43: ARROW-5908: [C#] ArrowStreamWriter doesn't align buffers to 8 bytes

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch maint-0.14.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 0c70e81675bbae75b6d2aa50e1f83fcf92ce34ba
Author: Eric Erhardt <er...@microsoft.com>
AuthorDate: Thu Jul 11 17:05:53 2019 -0500

    ARROW-5908: [C#] ArrowStreamWriter doesn't align buffers to 8 bytes
    
    Ensure 8-byte alignment on each buffer in a RecordBatch as specified in https://arrow.apache.org/docs/format/Layout.html#requirements-goals-and-non-goals
    
    >It is required to have all the contiguous memory buffers in an IPC payload aligned at 8-byte boundaries. In other words, each buffer must start at an aligned 8-byte offset. Additionally, each buffer should be padded to a multiple of 8 bytes.
    
    /cc @pgovind @stephentoub @imback82
    
    @wesm - If possible, can we also include this patch in the next release (0.14.1 or 0.15.0)? We hit this issue trying to update .NET for Apache Spark to the latest Arrow release - https://github.com/dotnet/spark/pull/167.
    
    Author: Eric Erhardt <er...@microsoft.com>
    
    Closes #4851 from eerhardt/FixWriterPadding and squashes the following commits:
    
    76807e938 <Eric Erhardt> PR feedback
    7ecda78c6 <Eric Erhardt> Ensure 8-byte alignment on each buffer in a RecordBatch.
---
 csharp/src/Apache.Arrow/BitUtility.cs              |  8 +++
 csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs   | 30 +++++++-----
 .../ArrowWriterBenchmark.cs                        | 57 ++++++++++++++++++++++
 .../Apache.Arrow.Tests/ArrowStreamWriterTests.cs   | 56 +++++++++++++++++++++
 csharp/test/Apache.Arrow.Tests/TestData.cs         | 50 +++++++++++--------
 5 files changed, 169 insertions(+), 32 deletions(-)

diff --git a/csharp/src/Apache.Arrow/BitUtility.cs b/csharp/src/Apache.Arrow/BitUtility.cs
index a5da46b..7d2cfbf 100644
--- a/csharp/src/Apache.Arrow/BitUtility.cs
+++ b/csharp/src/Apache.Arrow/BitUtility.cs
@@ -117,6 +117,14 @@ namespace Apache.Arrow
             RoundUpToMultiplePowerOfTwo(n, 64);
 
         /// <summary>
+        /// Rounds an integer to the nearest multiple of 8.
+        /// </summary>
+        /// <param name="n">Integer to round.</param>
+        /// <returns>Integer rounded to the nearest multiple of 8.</returns>
+        public static long RoundUpToMultipleOf8(long n) =>
+            RoundUpToMultiplePowerOfTwo(n, 8);
+
+        /// <summary>
         /// Rounds an integer up to the nearest multiple of factor, where
         /// factor must be a power of two.
         /// 
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
index 8488175..e1da448 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
@@ -45,17 +45,15 @@ namespace Apache.Arrow.Ipc
             IArrowArrayVisitor<StringArray>,
             IArrowArrayVisitor<BinaryArray>
         {
-            public struct Buffer
+            public readonly struct Buffer
             {
                 public readonly ArrowBuffer DataBuffer;
                 public readonly int Offset;
-                public readonly int Length;
 
-                public Buffer(ArrowBuffer buffer, int offset, int length)
+                public Buffer(ArrowBuffer buffer, int offset)
                 {
                     DataBuffer = buffer;
                     Offset = offset;
-                    Length = length;
                 }
             }
 
@@ -124,9 +122,10 @@ namespace Apache.Arrow.Ipc
             {
                 var offset = TotalLength;
 
-                TotalLength += buffer.Length;
+                int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length));
+                TotalLength += paddedLength;
 
-                return new Buffer(buffer, offset, buffer.Length);
+                return new Buffer(buffer, offset);
             }
 
             public void Visit(IArrowArray array)
@@ -215,7 +214,7 @@ namespace Apache.Arrow.Ipc
             for (var i = buffers.Count - 1; i >= 0; i--)
             {
                 Flatbuf.Buffer.CreateBuffer(Builder,
-                    buffers[i].Offset, buffers[i].Length);
+                    buffers[i].Offset, buffers[i].DataBuffer.Length);
             }
 
             var buffersVectorOffset = Builder.EndVector();
@@ -238,11 +237,20 @@ namespace Apache.Arrow.Ipc
 
             for (var i = 0; i < buffers.Count; i++)
             {
-                if (buffers[i].DataBuffer.IsEmpty)
+                ArrowBuffer buffer = buffers[i].DataBuffer;
+                if (buffer.IsEmpty)
                     continue;
 
-                await WriteBufferAsync(buffers[i].DataBuffer, cancellationToken).ConfigureAwait(false);
-                bodyLength += buffers[i].DataBuffer.Length;
+                await WriteBufferAsync(buffer, cancellationToken).ConfigureAwait(false);
+
+                int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length));
+                int padding = paddedLength - buffer.Length;
+                if (padding > 0)
+                {
+                    await WritePaddingAsync(padding).ConfigureAwait(false);
+                }
+
+                bodyLength += paddedLength;
             }
 
             // Write padding so the record batch message body length is a multiple of 8 bytes
@@ -332,7 +340,7 @@ namespace Apache.Arrow.Ipc
             where T: struct
         {
             var messageOffset = Flatbuf.Message.CreateMessage(
-                Builder, CurrentMetadataVersion, headerType, headerOffset.Value, 
+                Builder, CurrentMetadataVersion, headerType, headerOffset.Value,
                 bodyLength);
 
             Builder.Finish(messageOffset.Value);
diff --git a/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs b/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs
new file mode 100644
index 0000000..5b19486
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Tests;
+using BenchmarkDotNet.Attributes;
+using System.IO;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Benchmarks
+{
+    //[EtwProfiler] - needs elevated privileges
+    [MemoryDiagnoser]
+    public class ArrowWriterBenchmark
+    {
+        [Params(10_000, 1_000_000)]
+        public int BatchLength{ get; set; }
+
+        [Params(10, 25)]
+        public int ColumnSetCount { get; set; }
+
+        private MemoryStream _memoryStream;
+        private RecordBatch _batch;
+
+        [GlobalSetup]
+        public void GlobalSetup()
+        {
+            _batch = TestData.CreateSampleRecordBatch(BatchLength, ColumnSetCount);
+            _memoryStream = new MemoryStream();
+        }
+
+        [IterationSetup]
+        public void Setup()
+        {
+            _memoryStream.Position = 0;
+        }
+
+        [Benchmark]
+        public async Task WriteBatch()
+        {
+            ArrowStreamWriter writer = new ArrowStreamWriter(_memoryStream, _batch.Schema);
+            await writer.WriteRecordBatchAsync(_batch);
+        }
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs
index 83a97f3..06be8bd 100644
--- a/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs
+++ b/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs
@@ -14,6 +14,7 @@
 // limitations under the License.
 
 using Apache.Arrow.Ipc;
+using Apache.Arrow.Types;
 using System;
 using System.IO;
 using System.Linq;
@@ -139,5 +140,60 @@ namespace Apache.Arrow.Tests
                 }
             }
         }
+
+        [Fact]
+        public async Task WriteBatchWithCorrectPadding()
+        {
+            byte value1 = 0x04;
+            byte value2 = 0x14;
+            var batch = new RecordBatch(
+                new Schema.Builder()
+                    .Field(f => f.Name("age").DataType(Int32Type.Default))
+                    .Field(f => f.Name("characterCount").DataType(Int32Type.Default))
+                    .Build(),
+                new IArrowArray[]
+                {
+                    new Int32Array(
+                        new ArrowBuffer(new byte[] { value1, value1, 0x00, 0x00 }),
+                        ArrowBuffer.Empty,
+                        length: 1,
+                        nullCount: 0,
+                        offset: 0),
+                    new Int32Array(
+                        new ArrowBuffer(new byte[] { value2, value2, 0x00, 0x00 }),
+                        ArrowBuffer.Empty,
+                        length: 1,
+                        nullCount: 0,
+                        offset: 0)
+                },
+                length: 1);
+
+            await TestRoundTripRecordBatch(batch);
+
+            using (MemoryStream stream = new MemoryStream())
+            {
+                using (var writer = new ArrowStreamWriter(stream, batch.Schema, leaveOpen: true))
+                {
+                    await writer.WriteRecordBatchAsync(batch);
+                }
+
+                byte[] writtenBytes = stream.ToArray();
+
+                // ensure that the data buffers at the end are 8-byte aligned
+                Assert.Equal(value1, writtenBytes[writtenBytes.Length - 16]);
+                Assert.Equal(value1, writtenBytes[writtenBytes.Length - 15]);
+                for (int i = 14; i > 8; i--)
+                {
+                    Assert.Equal(0, writtenBytes[writtenBytes.Length - i]);
+                }
+
+                Assert.Equal(value2, writtenBytes[writtenBytes.Length - 8]);
+                Assert.Equal(value2, writtenBytes[writtenBytes.Length - 7]);
+                for (int i = 6; i > 0; i--)
+                {
+                    Assert.Equal(0, writtenBytes[writtenBytes.Length - i]);
+                }
+            }
+        }
     }
 }
diff --git a/csharp/test/Apache.Arrow.Tests/TestData.cs b/csharp/test/Apache.Arrow.Tests/TestData.cs
index 1bc046d..15774a7 100644
--- a/csharp/test/Apache.Arrow.Tests/TestData.cs
+++ b/csharp/test/Apache.Arrow.Tests/TestData.cs
@@ -23,26 +23,34 @@ namespace Apache.Arrow.Tests
     {
         public static RecordBatch CreateSampleRecordBatch(int length)
         {
+            return CreateSampleRecordBatch(length, columnSetCount: 1);
+        }
+
+        public static RecordBatch CreateSampleRecordBatch(int length, int columnSetCount)
+        {
             Schema.Builder builder = new Schema.Builder();
-            builder.Field(CreateField(BooleanType.Default));
-            builder.Field(CreateField(UInt8Type.Default));
-            builder.Field(CreateField(Int8Type.Default));
-            builder.Field(CreateField(UInt16Type.Default));
-            builder.Field(CreateField(Int16Type.Default));
-            builder.Field(CreateField(UInt32Type.Default));
-            builder.Field(CreateField(Int32Type.Default));
-            builder.Field(CreateField(UInt64Type.Default));
-            builder.Field(CreateField(Int64Type.Default));
-            builder.Field(CreateField(FloatType.Default));
-            builder.Field(CreateField(DoubleType.Default));
-            //builder.Field(CreateField(new DecimalType(19, 2)));
-            //builder.Field(CreateField(HalfFloatType.Default));
-            //builder.Field(CreateField(StringType.Default));
-            //builder.Field(CreateField(Date32Type.Default));
-            //builder.Field(CreateField(Date64Type.Default));
-            //builder.Field(CreateField(Time32Type.Default));
-            //builder.Field(CreateField(Time64Type.Default));
-            //builder.Field(CreateField(TimestampType.Default));
+            for (int i = 0; i < columnSetCount; i++)
+            {
+                builder.Field(CreateField(BooleanType.Default, i));
+                builder.Field(CreateField(UInt8Type.Default, i));
+                builder.Field(CreateField(Int8Type.Default, i));
+                builder.Field(CreateField(UInt16Type.Default, i));
+                builder.Field(CreateField(Int16Type.Default, i));
+                builder.Field(CreateField(UInt32Type.Default, i));
+                builder.Field(CreateField(Int32Type.Default, i));
+                builder.Field(CreateField(UInt64Type.Default, i));
+                builder.Field(CreateField(Int64Type.Default, i));
+                builder.Field(CreateField(FloatType.Default, i));
+                builder.Field(CreateField(DoubleType.Default, i));
+                //builder.Field(CreateField(new DecimalType(19, 2)));
+                //builder.Field(CreateField(HalfFloatType.Default));
+                //builder.Field(CreateField(StringType.Default));
+                //builder.Field(CreateField(Date32Type.Default));
+                //builder.Field(CreateField(Date64Type.Default));
+                //builder.Field(CreateField(Time32Type.Default));
+                //builder.Field(CreateField(Time64Type.Default));
+                //builder.Field(CreateField(TimestampType.Default));
+            }
 
             Schema schema = builder.Build();
 
@@ -51,9 +59,9 @@ namespace Apache.Arrow.Tests
             return new RecordBatch(schema, arrays, length);
         }
 
-        private static Field CreateField(ArrowType type)
+        private static Field CreateField(ArrowType type, int iteration)
         {
-            return new Field(type.Name, type, nullable: false);
+            return new Field(type.Name + iteration, type, nullable: false);
         }
 
         private static IEnumerable<IArrowArray> CreateArrays(Schema schema, int length)