You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/07/11 22:06:11 UTC
[arrow] branch master updated: ARROW-5908: [C#] ArrowStreamWriter
doesn't align buffers to 8 bytes
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 15fca3d ARROW-5908: [C#] ArrowStreamWriter doesn't align buffers to 8 bytes
15fca3d is described below
commit 15fca3d0169479fc4d21619b22515d95cd264fc5
Author: Eric Erhardt <er...@microsoft.com>
AuthorDate: Thu Jul 11 17:05:53 2019 -0500
ARROW-5908: [C#] ArrowStreamWriter doesn't align buffers to 8 bytes
Ensure 8-byte alignment on each buffer in a RecordBatch as specified in https://arrow.apache.org/docs/format/Layout.html#requirements-goals-and-non-goals
>It is required to have all the contiguous memory buffers in an IPC payload aligned at 8-byte boundaries. In other words, each buffer must start at an aligned 8-byte offset. Additionally, each buffer should be padded to a multiple of 8 bytes.
/cc @pgovind @stephentoub @imback82
@wesm - If possible, can we also include this patch in the next release (0.14.1 or 0.15.0)? We hit this issue trying to update .NET for Apache Spark to the latest Arrow release - https://github.com/dotnet/spark/pull/167.
Author: Eric Erhardt <er...@microsoft.com>
Closes #4851 from eerhardt/FixWriterPadding and squashes the following commits:
76807e938 <Eric Erhardt> PR feedback
7ecda78c6 <Eric Erhardt> Ensure 8-byte alignment on each buffer in a RecordBatch.
---
csharp/src/Apache.Arrow/BitUtility.cs | 8 +++
csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs | 30 +++++++-----
.../ArrowWriterBenchmark.cs | 57 ++++++++++++++++++++++
.../Apache.Arrow.Tests/ArrowStreamWriterTests.cs | 56 +++++++++++++++++++++
csharp/test/Apache.Arrow.Tests/TestData.cs | 50 +++++++++++--------
5 files changed, 169 insertions(+), 32 deletions(-)
diff --git a/csharp/src/Apache.Arrow/BitUtility.cs b/csharp/src/Apache.Arrow/BitUtility.cs
index a5da46b..7d2cfbf 100644
--- a/csharp/src/Apache.Arrow/BitUtility.cs
+++ b/csharp/src/Apache.Arrow/BitUtility.cs
@@ -117,6 +117,14 @@ namespace Apache.Arrow
RoundUpToMultiplePowerOfTwo(n, 64);
/// <summary>
+ /// Rounds an integer to the nearest multiple of 8.
+ /// </summary>
+ /// <param name="n">Integer to round.</param>
+ /// <returns>Integer rounded to the nearest multiple of 8.</returns>
+ public static long RoundUpToMultipleOf8(long n) =>
+ RoundUpToMultiplePowerOfTwo(n, 8);
+
+ /// <summary>
/// Rounds an integer up to the nearest multiple of factor, where
/// factor must be a power of two.
///
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
index 8488175..e1da448 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
@@ -45,17 +45,15 @@ namespace Apache.Arrow.Ipc
IArrowArrayVisitor<StringArray>,
IArrowArrayVisitor<BinaryArray>
{
- public struct Buffer
+ public readonly struct Buffer
{
public readonly ArrowBuffer DataBuffer;
public readonly int Offset;
- public readonly int Length;
- public Buffer(ArrowBuffer buffer, int offset, int length)
+ public Buffer(ArrowBuffer buffer, int offset)
{
DataBuffer = buffer;
Offset = offset;
- Length = length;
}
}
@@ -124,9 +122,10 @@ namespace Apache.Arrow.Ipc
{
var offset = TotalLength;
- TotalLength += buffer.Length;
+ int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length));
+ TotalLength += paddedLength;
- return new Buffer(buffer, offset, buffer.Length);
+ return new Buffer(buffer, offset);
}
public void Visit(IArrowArray array)
@@ -215,7 +214,7 @@ namespace Apache.Arrow.Ipc
for (var i = buffers.Count - 1; i >= 0; i--)
{
Flatbuf.Buffer.CreateBuffer(Builder,
- buffers[i].Offset, buffers[i].Length);
+ buffers[i].Offset, buffers[i].DataBuffer.Length);
}
var buffersVectorOffset = Builder.EndVector();
@@ -238,11 +237,20 @@ namespace Apache.Arrow.Ipc
for (var i = 0; i < buffers.Count; i++)
{
- if (buffers[i].DataBuffer.IsEmpty)
+ ArrowBuffer buffer = buffers[i].DataBuffer;
+ if (buffer.IsEmpty)
continue;
- await WriteBufferAsync(buffers[i].DataBuffer, cancellationToken).ConfigureAwait(false);
- bodyLength += buffers[i].DataBuffer.Length;
+ await WriteBufferAsync(buffer, cancellationToken).ConfigureAwait(false);
+
+ int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length));
+ int padding = paddedLength - buffer.Length;
+ if (padding > 0)
+ {
+ await WritePaddingAsync(padding).ConfigureAwait(false);
+ }
+
+ bodyLength += paddedLength;
}
// Write padding so the record batch message body length is a multiple of 8 bytes
@@ -332,7 +340,7 @@ namespace Apache.Arrow.Ipc
where T: struct
{
var messageOffset = Flatbuf.Message.CreateMessage(
- Builder, CurrentMetadataVersion, headerType, headerOffset.Value,
+ Builder, CurrentMetadataVersion, headerType, headerOffset.Value,
bodyLength);
Builder.Finish(messageOffset.Value);
diff --git a/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs b/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs
new file mode 100644
index 0000000..5b19486
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Tests;
+using BenchmarkDotNet.Attributes;
+using System.IO;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Benchmarks
+{
+ //[EtwProfiler] - needs elevated privileges
+ [MemoryDiagnoser]
+ public class ArrowWriterBenchmark
+ {
+ [Params(10_000, 1_000_000)]
+ public int BatchLength{ get; set; }
+
+ [Params(10, 25)]
+ public int ColumnSetCount { get; set; }
+
+ private MemoryStream _memoryStream;
+ private RecordBatch _batch;
+
+ [GlobalSetup]
+ public void GlobalSetup()
+ {
+ _batch = TestData.CreateSampleRecordBatch(BatchLength, ColumnSetCount);
+ _memoryStream = new MemoryStream();
+ }
+
+ [IterationSetup]
+ public void Setup()
+ {
+ _memoryStream.Position = 0;
+ }
+
+ [Benchmark]
+ public async Task WriteBatch()
+ {
+ ArrowStreamWriter writer = new ArrowStreamWriter(_memoryStream, _batch.Schema);
+ await writer.WriteRecordBatchAsync(_batch);
+ }
+ }
+}
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs
index 83a97f3..06be8bd 100644
--- a/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs
+++ b/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs
@@ -14,6 +14,7 @@
// limitations under the License.
using Apache.Arrow.Ipc;
+using Apache.Arrow.Types;
using System;
using System.IO;
using System.Linq;
@@ -139,5 +140,60 @@ namespace Apache.Arrow.Tests
}
}
}
+
+ [Fact]
+ public async Task WriteBatchWithCorrectPadding()
+ {
+ byte value1 = 0x04;
+ byte value2 = 0x14;
+ var batch = new RecordBatch(
+ new Schema.Builder()
+ .Field(f => f.Name("age").DataType(Int32Type.Default))
+ .Field(f => f.Name("characterCount").DataType(Int32Type.Default))
+ .Build(),
+ new IArrowArray[]
+ {
+ new Int32Array(
+ new ArrowBuffer(new byte[] { value1, value1, 0x00, 0x00 }),
+ ArrowBuffer.Empty,
+ length: 1,
+ nullCount: 0,
+ offset: 0),
+ new Int32Array(
+ new ArrowBuffer(new byte[] { value2, value2, 0x00, 0x00 }),
+ ArrowBuffer.Empty,
+ length: 1,
+ nullCount: 0,
+ offset: 0)
+ },
+ length: 1);
+
+ await TestRoundTripRecordBatch(batch);
+
+ using (MemoryStream stream = new MemoryStream())
+ {
+ using (var writer = new ArrowStreamWriter(stream, batch.Schema, leaveOpen: true))
+ {
+ await writer.WriteRecordBatchAsync(batch);
+ }
+
+ byte[] writtenBytes = stream.ToArray();
+
+ // ensure that the data buffers at the end are 8-byte aligned
+ Assert.Equal(value1, writtenBytes[writtenBytes.Length - 16]);
+ Assert.Equal(value1, writtenBytes[writtenBytes.Length - 15]);
+ for (int i = 14; i > 8; i--)
+ {
+ Assert.Equal(0, writtenBytes[writtenBytes.Length - i]);
+ }
+
+ Assert.Equal(value2, writtenBytes[writtenBytes.Length - 8]);
+ Assert.Equal(value2, writtenBytes[writtenBytes.Length - 7]);
+ for (int i = 6; i > 0; i--)
+ {
+ Assert.Equal(0, writtenBytes[writtenBytes.Length - i]);
+ }
+ }
+ }
}
}
diff --git a/csharp/test/Apache.Arrow.Tests/TestData.cs b/csharp/test/Apache.Arrow.Tests/TestData.cs
index 1bc046d..15774a7 100644
--- a/csharp/test/Apache.Arrow.Tests/TestData.cs
+++ b/csharp/test/Apache.Arrow.Tests/TestData.cs
@@ -23,26 +23,34 @@ namespace Apache.Arrow.Tests
{
public static RecordBatch CreateSampleRecordBatch(int length)
{
+ return CreateSampleRecordBatch(length, columnSetCount: 1);
+ }
+
+ public static RecordBatch CreateSampleRecordBatch(int length, int columnSetCount)
+ {
Schema.Builder builder = new Schema.Builder();
- builder.Field(CreateField(BooleanType.Default));
- builder.Field(CreateField(UInt8Type.Default));
- builder.Field(CreateField(Int8Type.Default));
- builder.Field(CreateField(UInt16Type.Default));
- builder.Field(CreateField(Int16Type.Default));
- builder.Field(CreateField(UInt32Type.Default));
- builder.Field(CreateField(Int32Type.Default));
- builder.Field(CreateField(UInt64Type.Default));
- builder.Field(CreateField(Int64Type.Default));
- builder.Field(CreateField(FloatType.Default));
- builder.Field(CreateField(DoubleType.Default));
- //builder.Field(CreateField(new DecimalType(19, 2)));
- //builder.Field(CreateField(HalfFloatType.Default));
- //builder.Field(CreateField(StringType.Default));
- //builder.Field(CreateField(Date32Type.Default));
- //builder.Field(CreateField(Date64Type.Default));
- //builder.Field(CreateField(Time32Type.Default));
- //builder.Field(CreateField(Time64Type.Default));
- //builder.Field(CreateField(TimestampType.Default));
+ for (int i = 0; i < columnSetCount; i++)
+ {
+ builder.Field(CreateField(BooleanType.Default, i));
+ builder.Field(CreateField(UInt8Type.Default, i));
+ builder.Field(CreateField(Int8Type.Default, i));
+ builder.Field(CreateField(UInt16Type.Default, i));
+ builder.Field(CreateField(Int16Type.Default, i));
+ builder.Field(CreateField(UInt32Type.Default, i));
+ builder.Field(CreateField(Int32Type.Default, i));
+ builder.Field(CreateField(UInt64Type.Default, i));
+ builder.Field(CreateField(Int64Type.Default, i));
+ builder.Field(CreateField(FloatType.Default, i));
+ builder.Field(CreateField(DoubleType.Default, i));
+ //builder.Field(CreateField(new DecimalType(19, 2)));
+ //builder.Field(CreateField(HalfFloatType.Default));
+ //builder.Field(CreateField(StringType.Default));
+ //builder.Field(CreateField(Date32Type.Default));
+ //builder.Field(CreateField(Date64Type.Default));
+ //builder.Field(CreateField(Time32Type.Default));
+ //builder.Field(CreateField(Time64Type.Default));
+ //builder.Field(CreateField(TimestampType.Default));
+ }
Schema schema = builder.Build();
@@ -51,9 +59,9 @@ namespace Apache.Arrow.Tests
return new RecordBatch(schema, arrays, length);
}
- private static Field CreateField(ArrowType type)
+ private static Field CreateField(ArrowType type, int iteration)
{
- return new Field(type.Name, type, nullable: false);
+ return new Field(type.Name + iteration, type, nullable: false);
}
private static IEnumerable<IArrowArray> CreateArrays(Schema schema, int length)