You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/03/07 15:00:35 UTC

[arrow] branch master updated: ARROW-4502: [C#] Add support for zero-copy reads

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d2dbf1e  ARROW-4502: [C#] Add support for zero-copy reads
d2dbf1e is described below

commit d2dbf1e169b3455964babc4fea161cb48ca89e20
Author: Eric Erhardt <er...@microsoft.com>
AuthorDate: Thu Mar 7 09:00:12 2019 -0600

    ARROW-4502: [C#] Add support for zero-copy reads
    
    - Update to the latest Google FlatBuffers code to support Spans/Memory.
    
    - Add a constructor for ArrowStreamReader that takes a ReadOnlyMemory<byte>.
    - Add a synchronous ReadNextRecordBatch() method.
    
    - Since we are now enabling Spans with FlatBuffers, we need to change the way we write to streams in the ArrowStreamWriter to use Memory<byte> instead of byte[]. This API is in netcoreapp2.1, but not in netstandard, so cross compile for netcoreapp2.1 and add a shim for netstandard.
    
    ~Unit tests are coming. I currently haven't found a great way to "read" arrow streams out of thin air. My initial thought is to use the writer to write some made up data, and then read it in using the reader and ensure the values coming back are the same. @wesm - does that sound like a good approach? I was using a binary file (that was written by PyArrow) locally to test this out.~
    
    ~I also plan on adding some benchmark tests to compare between the Stream and the ReadOnlyMemory implementations, but again am having trouble with "how to get the stream to read?".~
    
    @stephentoub @pgovind @chutchinson
    
    Author: Eric Erhardt <er...@microsoft.com>
    
    Closes #3736 from eerhardt/ZeroCopyReads and squashes the following commits:
    
    21f41bab6 <Eric Erhardt> Add RAT exclude for csharp benchmark tests csproj
    558ec56ad <Eric Erhardt> Address PR feedback.
    6ebc80e73 <Eric Erhardt> Add perf benchmarks for the ArrowStreamReader.
    18db336a8 <Eric Erhardt> Respond to PR feedback.
    98e1b11ff <Eric Erhardt> Add more types to tests.
    f6942cfab <Eric Erhardt> Add initial unit tests for ArrowStreamReader.
    f33e294c6 <Eric Erhardt> ARROW-4502:  Add support for zero-copy reads
---
 csharp/Apache.Arrow.sln                            |   6 +
 csharp/src/Apache.Arrow/Apache.Arrow.csproj        |   6 +-
 csharp/src/Apache.Arrow/ArrowBuffer.cs             |   2 +-
 .../Extensions/StreamExtensions.netstandard.cs     |  54 ++++
 .../Apache.Arrow/Flatbuf/FlatBuffers/ByteBuffer.cs | 321 +++++++++++----------
 csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs     | 188 +-----------
 ...eReader.cs => ArrowFileReaderImplementation.cs} |  68 ++---
 csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs     |  22 +-
 .../Ipc/ArrowMemoryReaderImplementation.cs         |  98 +++++++
 .../Apache.Arrow/Ipc/ArrowReaderImplementation.cs  | 208 +++++++++++++
 csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs   | 263 ++---------------
 .../Ipc/ArrowStreamReaderImplementation.cs         | 164 +++++++++++
 csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs   |  29 +-
 .../Apache.Arrow/Ipc/ArrowTypeFlatbufferBuilder.cs |   1 +
 .../Ipc/ReadOnlyMemoryBufferAllocator.cs           |  39 +++
 csharp/src/Apache.Arrow/RecordBatch.cs             |  12 +
 .../Apache.Arrow.Benchmarks.csproj                 |  19 ++
 .../ArrowReaderBenchmark.cs                        | 119 ++++++++
 csharp/test/Apache.Arrow.Benchmarks/Program.cs     |  29 ++
 .../Apache.Arrow.Tests/ArrowStreamReaderTests.cs   | 130 +++++++++
 csharp/test/Apache.Arrow.Tests/TestData.cs         | 171 +++++++++++
 dev/release/rat_exclude_files.txt                  |   1 +
 22 files changed, 1308 insertions(+), 642 deletions(-)

diff --git a/csharp/Apache.Arrow.sln b/csharp/Apache.Arrow.sln
index 53b463c..8b86561 100644
--- a/csharp/Apache.Arrow.sln
+++ b/csharp/Apache.Arrow.sln
@@ -7,6 +7,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow", "src\Apache.
 EndProject
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Tests", "test\Apache.Arrow.Tests\Apache.Arrow.Tests.csproj", "{9CCEC01B-E67A-4726-BE72-7B514F76163F}"
 EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Benchmarks", "test\Apache.Arrow.Benchmarks\Apache.Arrow.Benchmarks.csproj", "{742DF47D-77C5-4B84-9E0C-69645F1161EA}"
+EndProject
 Global
 	GlobalSection(SolutionConfigurationPlatforms) = preSolution
 		Debug|Any CPU = Debug|Any CPU
@@ -21,6 +23,10 @@ Global
 		{9CCEC01B-E67A-4726-BE72-7B514F76163F}.Debug|Any CPU.Build.0 = Debug|Any CPU
 		{9CCEC01B-E67A-4726-BE72-7B514F76163F}.Release|Any CPU.ActiveCfg = Release|Any CPU
 		{9CCEC01B-E67A-4726-BE72-7B514F76163F}.Release|Any CPU.Build.0 = Release|Any CPU
+		{742DF47D-77C5-4B84-9E0C-69645F1161EA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{742DF47D-77C5-4B84-9E0C-69645F1161EA}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{742DF47D-77C5-4B84-9E0C-69645F1161EA}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{742DF47D-77C5-4B84-9E0C-69645F1161EA}.Release|Any CPU.Build.0 = Release|Any CPU
 	EndGlobalSection
 	GlobalSection(SolutionProperties) = preSolution
 		HideSolutionNode = FALSE
diff --git a/csharp/src/Apache.Arrow/Apache.Arrow.csproj b/csharp/src/Apache.Arrow/Apache.Arrow.csproj
index c2d73ec..fde9df7 100644
--- a/csharp/src/Apache.Arrow/Apache.Arrow.csproj
+++ b/csharp/src/Apache.Arrow/Apache.Arrow.csproj
@@ -3,7 +3,7 @@
   <Import Project="../../build/Common.props" />
 
   <PropertyGroup>
-    <TargetFramework>netstandard1.3</TargetFramework>
+    <TargetFrameworks>netstandard1.3;netcoreapp2.1</TargetFrameworks>
     <LangVersion>7.2</LangVersion>
     <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
     <Authors>Apache</Authors>
@@ -15,6 +15,7 @@
     <PackageTags>apache arrow</PackageTags>
     <Company>Apache</Company>
     <Version>0.0.1</Version>
+    <DefineConstants>$(DefineConstants);UNSAFE_BYTEBUFFER;BYTEBUFFER_NO_BOUNDS_CHECK;ENABLE_SPAN_T</DefineConstants>
   </PropertyGroup>
 
   <ItemGroup>
@@ -39,4 +40,7 @@
     </EmbeddedResource>
   </ItemGroup>
 
+  <ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp2.1'">
+    <Compile Remove="Extensions\StreamExtensions.netstandard.cs" />
+  </ItemGroup>
 </Project>
diff --git a/csharp/src/Apache.Arrow/ArrowBuffer.cs b/csharp/src/Apache.Arrow/ArrowBuffer.cs
index 8901ff9..8c48d39 100644
--- a/csharp/src/Apache.Arrow/ArrowBuffer.cs
+++ b/csharp/src/Apache.Arrow/ArrowBuffer.cs
@@ -23,7 +23,7 @@ namespace Apache.Arrow
     {
         public static ArrowBuffer Empty => new ArrowBuffer(Memory<byte>.Empty);
 
-        private ArrowBuffer(Memory<byte> data)
+        internal ArrowBuffer(ReadOnlyMemory<byte> data)
         {
             Memory = data;
         }
diff --git a/csharp/src/Apache.Arrow/Extensions/StreamExtensions.netstandard.cs b/csharp/src/Apache.Arrow/Extensions/StreamExtensions.netstandard.cs
new file mode 100644
index 0000000..94affa3
--- /dev/null
+++ b/csharp/src/Apache.Arrow/Extensions/StreamExtensions.netstandard.cs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Buffers;
+using System.IO;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow
+{
+    // Helpers to write Memory<byte> to Stream on netstandard
+    internal static class StreamExtensions
+    {
+        public static Task WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
+        {
+            if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
+            {
+                return stream.WriteAsync(array.Array, array.Offset, array.Count, cancellationToken);
+            }
+            else
+            {
+                byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
+                buffer.Span.CopyTo(sharedBuffer);
+                return FinishWriteAsync(stream.WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer);
+            }
+        }
+
+        private static async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
+        {
+            try
+            {
+                await writeTask.ConfigureAwait(false);
+            }
+            finally
+            {
+                ArrayPool<byte>.Shared.Return(localBuffer);
+            }
+        }
+    }
+}
diff --git a/csharp/src/Apache.Arrow/Flatbuf/FlatBuffers/ByteBuffer.cs b/csharp/src/Apache.Arrow/Flatbuf/FlatBuffers/ByteBuffer.cs
index 63a35be..f8c3e9b 100644
--- a/csharp/src/Apache.Arrow/Flatbuf/FlatBuffers/ByteBuffer.cs
+++ b/csharp/src/Apache.Arrow/Flatbuf/FlatBuffers/ByteBuffer.cs
@@ -42,20 +42,24 @@ using System.Runtime.CompilerServices;
 using System.Runtime.InteropServices;
 using System.Text;
 
+#if ENABLE_SPAN_T
+using System.Buffers.Binary;
+#endif
+
 #if ENABLE_SPAN_T && !UNSAFE_BYTEBUFFER
 #error ENABLE_SPAN_T requires UNSAFE_BYTEBUFFER to also be defined
 #endif
 
 namespace FlatBuffers
 {
-    internal abstract class ByteBufferAllocator : IDisposable
+    internal abstract class ByteBufferAllocator
     {
-#if UNSAFE_BYTEBUFFER
-        public unsafe byte* Buffer
-        {
-            get;
-            protected set;
-        }
+#if ENABLE_SPAN_T
+        public abstract Span<byte> Span { get; }
+        public abstract ReadOnlySpan<byte> ReadOnlySpan { get; }
+        public abstract Memory<byte> Memory { get; }
+        public abstract ReadOnlyMemory<byte> ReadOnlyMemory { get; }
+
 #else
         public byte[] Buffer
         {
@@ -70,23 +74,17 @@ namespace FlatBuffers
             protected set;
         }
 
-        public abstract void Dispose();
-
         public abstract void GrowFront(int newSize);
-
-#if !ENABLE_SPAN_T
-        public abstract byte[] ByteArray { get; }
-#endif
     }
 
-    internal class ByteArrayAllocator : ByteBufferAllocator
+    internal sealed class ByteArrayAllocator : ByteBufferAllocator
     {
         private byte[] _buffer;
 
         public ByteArrayAllocator(byte[] buffer)
         {
             _buffer = buffer;
-            InitPointer();
+            InitBuffer();
         }
 
         public override void GrowFront(int newSize)
@@ -101,63 +99,29 @@ namespace FlatBuffers
             byte[] newBuffer = new byte[newSize];
             System.Buffer.BlockCopy(_buffer, 0, newBuffer, newSize - Length, Length);
             _buffer = newBuffer;
-            InitPointer();
+            InitBuffer();
         }
 
-        public override void Dispose()
-        {
-            GC.SuppressFinalize(this);
-#if UNSAFE_BYTEBUFFER
-            if (_handle.IsAllocated)
-            {
-                _handle.Free();
-            }
-#endif
-        }
-
-#if !ENABLE_SPAN_T
-        public override byte[] ByteArray
-        {
-            get { return _buffer; }
-        }
-#endif
-
-#if UNSAFE_BYTEBUFFER
-        private GCHandle _handle;
-
-        ~ByteArrayAllocator()
-        {
-            if (_handle.IsAllocated)
-            {
-                _handle.Free();
-            }
-        }
+#if ENABLE_SPAN_T
+        public override Span<byte> Span => _buffer;
+        public override ReadOnlySpan<byte> ReadOnlySpan => _buffer;
+        public override Memory<byte> Memory => _buffer;
+        public override ReadOnlyMemory<byte> ReadOnlyMemory => _buffer;
 #endif
 
-        private void InitPointer()
+        private void InitBuffer()
         {
             Length = _buffer.Length;
-#if UNSAFE_BYTEBUFFER
-            if (_handle.IsAllocated)
-            {
-                _handle.Free();
-            }
-            _handle = GCHandle.Alloc(_buffer, GCHandleType.Pinned);
-            unsafe
-            {
-                Buffer = (byte*)_handle.AddrOfPinnedObject().ToPointer();
-            }
-#else
+#if !ENABLE_SPAN_T
             Buffer = _buffer;
 #endif
         }
     }
 
-
     /// <summary>
     /// Class to mimic Java's ByteBuffer which is used heavily in Flatbuffers.
     /// </summary>
-    internal class ByteBuffer : IDisposable
+    internal class ByteBuffer
     {
         private ByteBufferAllocator _buffer;
         private int _pos;  // Must track start of the buffer.
@@ -178,15 +142,8 @@ namespace FlatBuffers
             _pos = pos;
         }
 
-        public void Dispose()
+        public int Position
         {
-            if (_buffer != null)
-            {
-                _buffer.Dispose();
-            }
-        }
-
-        public int Position {
             get { return _pos; }
             set { _pos = value; }
         }
@@ -280,14 +237,8 @@ namespace FlatBuffers
         public T[] ToArray<T>(int pos, int len)
             where T : struct
         {
-            unsafe
-            {
-                AssertOffsetAndLength(pos, len);
-                T[] arr = new T[len];
-                var typed = MemoryMarshal.Cast<byte, T>(new Span<byte>(_buffer.Buffer + pos, _buffer.Length));
-                typed.Slice(0, arr.Length).CopyTo(arr);
-                return arr;
-            }
+            AssertOffsetAndLength(pos, len);
+            return MemoryMarshal.Cast<byte, T>(_buffer.ReadOnlySpan.Slice(pos)).Slice(0, len).ToArray();
         }
 #else
         public T[] ToArray<T>(int pos, int len)
@@ -295,7 +246,7 @@ namespace FlatBuffers
         {
             AssertOffsetAndLength(pos, len);
             T[] arr = new T[len];
-            Buffer.BlockCopy(_buffer.ByteArray, pos, arr, 0, ArraySize(arr));
+            Buffer.BlockCopy(_buffer.Buffer, pos, arr, 0, ArraySize(arr));
             return arr;
         }
 #endif
@@ -310,23 +261,30 @@ namespace FlatBuffers
             return ToArray<byte>(0, Length);
         }
 
-
 #if ENABLE_SPAN_T
-        public unsafe Span<byte> ToSpan(int pos, int len)
+        public ReadOnlyMemory<byte> ToReadOnlyMemory(int pos, int len)
+        {
+            return _buffer.ReadOnlyMemory.Slice(pos, len);
+        }
+
+        public Memory<byte> ToMemory(int pos, int len)
+        {
+            return _buffer.Memory.Slice(pos, len);
+        }
+
+        public Span<byte> ToSpan(int pos, int len)
         {
-            return new Span<byte>(_buffer.Buffer, _buffer.Length).Slice(pos, len);
+            return _buffer.Span.Slice(pos, len);
         }
 #else
         public ArraySegment<byte> ToArraySegment(int pos, int len)
         {
-            return new ArraySegment<byte>(_buffer.ByteArray, pos, len);
+            return new ArraySegment<byte>(_buffer.Buffer, pos, len);
         }
-#endif
 
-#if !ENABLE_SPAN_T
         public MemoryStream ToMemoryStream(int pos, int len)
         {
-            return new MemoryStream(_buffer.ByteArray, pos, len);
+            return new MemoryStream(_buffer.Buffer, pos, len);
         }
 #endif
 
@@ -347,8 +305,8 @@ namespace FlatBuffers
         static public uint ReverseBytes(uint input)
         {
             return ((input & 0x000000FFU) << 24) |
-                   ((input & 0x0000FF00U) <<  8) |
-                   ((input & 0x00FF0000U) >>  8) |
+                   ((input & 0x0000FF00U) << 8) |
+                   ((input & 0x00FF0000U) >> 8) |
                    ((input & 0xFF000000U) >> 24);
         }
         static public ulong ReverseBytes(ulong input)
@@ -356,8 +314,8 @@ namespace FlatBuffers
             return (((input & 0x00000000000000FFUL) << 56) |
                     ((input & 0x000000000000FF00UL) << 40) |
                     ((input & 0x0000000000FF0000UL) << 24) |
-                    ((input & 0x00000000FF000000UL) <<  8) |
-                    ((input & 0x000000FF00000000UL) >>  8) |
+                    ((input & 0x00000000FF000000UL) << 8) |
+                    ((input & 0x000000FF00000000UL) >> 8) |
                     ((input & 0x0000FF0000000000UL) >> 24) |
                     ((input & 0x00FF000000000000UL) >> 40) |
                     ((input & 0xFF00000000000000UL) >> 56));
@@ -391,15 +349,15 @@ namespace FlatBuffers
             {
                 for (int i = 0; i < count; i++)
                 {
-                  r |= (ulong)_buffer.Buffer[offset + i] << i * 8;
+                    r |= (ulong)_buffer.Buffer[offset + i] << i * 8;
                 }
             }
             else
             {
-              for (int i = 0; i < count; i++)
-              {
-                r |= (ulong)_buffer.Buffer[offset + count - 1 - i] << i * 8;
-              }
+                for (int i = 0; i < count; i++)
+                {
+                    r |= (ulong)_buffer.Buffer[offset + count - 1 - i] << i * 8;
+                }
             }
             return r;
         }
@@ -414,31 +372,26 @@ namespace FlatBuffers
 #endif
         }
 
-#if UNSAFE_BYTEBUFFER
+#if ENABLE_SPAN_T
 
-        public unsafe void PutSbyte(int offset, sbyte value)
+        public void PutSbyte(int offset, sbyte value)
         {
             AssertOffsetAndLength(offset, sizeof(sbyte));
-            _buffer.Buffer[offset] = (byte)value;
+            _buffer.Span[offset] = (byte)value;
         }
 
-        public unsafe void PutByte(int offset, byte value)
+        public void PutByte(int offset, byte value)
         {
             AssertOffsetAndLength(offset, sizeof(byte));
-            _buffer.Buffer[offset] = value;
+            _buffer.Span[offset] = value;
         }
 
-        public unsafe void PutByte(int offset, byte value, int count)
+        public void PutByte(int offset, byte value, int count)
         {
             AssertOffsetAndLength(offset, sizeof(byte) * count);
-            for (var i = 0; i < count; ++i)
-                _buffer.Buffer[offset + i] = value;
-        }
-
-        // this method exists in order to conform with Java ByteBuffer standards
-        public void Put(int offset, byte value)
-        {
-            PutByte(offset, value);
+            Span<byte> span = _buffer.Span.Slice(offset, count);
+            for (var i = 0; i < span.Length; ++i)
+                span[i] = value;
         }
 #else
         public void PutSbyte(int offset, sbyte value)
@@ -459,13 +412,13 @@ namespace FlatBuffers
             for (var i = 0; i < count; ++i)
                 _buffer.Buffer[offset + i] = value;
         }
+#endif
 
         // this method exists in order to conform with Java ByteBuffer standards
         public void Put(int offset, byte value)
         {
             PutByte(offset, value);
         }
-#endif
 
 #if ENABLE_SPAN_T
         public unsafe void PutStringUTF8(int offset, string value)
@@ -473,7 +426,10 @@ namespace FlatBuffers
             AssertOffsetAndLength(offset, value.Length);
             fixed (char* s = value)
             {
-                Encoding.UTF8.GetBytes(s, value.Length, _buffer.Buffer + offset, Length - offset);
+                fixed (byte* buffer = &MemoryMarshal.GetReference(_buffer.Span))
+                {
+                    Encoding.UTF8.GetBytes(s, value.Length, buffer + offset, Length - offset);
+                }
             }
         }
 #else
@@ -481,7 +437,7 @@ namespace FlatBuffers
         {
             AssertOffsetAndLength(offset, value.Length);
             Encoding.UTF8.GetBytes(value, 0, value.Length,
-                _buffer.ByteArray, offset);
+                _buffer.Buffer, offset);
         }
 #endif
 
@@ -495,10 +451,17 @@ namespace FlatBuffers
         public unsafe void PutUshort(int offset, ushort value)
         {
             AssertOffsetAndLength(offset, sizeof(ushort));
-            byte* ptr = _buffer.Buffer;
-            *(ushort*)(ptr + offset) = BitConverter.IsLittleEndian
-                ? value
-                : ReverseBytes(value);
+#if ENABLE_SPAN_T
+            Span<byte> span = _buffer.Span.Slice(offset);
+            BinaryPrimitives.WriteUInt16LittleEndian(span, value);
+#else
+            fixed (byte* ptr = _buffer.Buffer)
+            {
+                *(ushort*)(ptr + offset) = BitConverter.IsLittleEndian
+                    ? value
+                    : ReverseBytes(value);
+            }
+#endif
         }
 
         public void PutInt(int offset, int value)
@@ -509,10 +472,17 @@ namespace FlatBuffers
         public unsafe void PutUint(int offset, uint value)
         {
             AssertOffsetAndLength(offset, sizeof(uint));
-            byte* ptr = _buffer.Buffer;
-            *(uint*)(ptr + offset) = BitConverter.IsLittleEndian
-                ? value
-                : ReverseBytes(value);
+#if ENABLE_SPAN_T
+            Span<byte> span = _buffer.Span.Slice(offset);
+            BinaryPrimitives.WriteUInt32LittleEndian(span, value);
+#else
+            fixed (byte* ptr = _buffer.Buffer)
+            {
+                *(uint*)(ptr + offset) = BitConverter.IsLittleEndian
+                    ? value
+                    : ReverseBytes(value);
+            }
+#endif
         }
 
         public unsafe void PutLong(int offset, long value)
@@ -523,38 +493,56 @@ namespace FlatBuffers
         public unsafe void PutUlong(int offset, ulong value)
         {
             AssertOffsetAndLength(offset, sizeof(ulong));
-            byte* ptr = _buffer.Buffer;
-            *(ulong*)(ptr + offset) = BitConverter.IsLittleEndian
-                ? value
-                : ReverseBytes(value);
+#if ENABLE_SPAN_T
+            Span<byte> span = _buffer.Span.Slice(offset);
+            BinaryPrimitives.WriteUInt64LittleEndian(span, value);
+#else
+            fixed (byte* ptr = _buffer.Buffer)
+            {
+                *(ulong*)(ptr + offset) = BitConverter.IsLittleEndian
+                    ? value
+                    : ReverseBytes(value);
+            }
+#endif
         }
 
         public unsafe void PutFloat(int offset, float value)
         {
             AssertOffsetAndLength(offset, sizeof(float));
-            byte* ptr = _buffer.Buffer;
-            if (BitConverter.IsLittleEndian)
-            {
-                *(float*)(ptr + offset) = value;
-            }
-            else
+#if ENABLE_SPAN_T
+            fixed (byte* ptr = &MemoryMarshal.GetReference(_buffer.Span))
+#else
+            fixed (byte* ptr = _buffer.Buffer)
+#endif
             {
-                *(uint*)(ptr + offset) = ReverseBytes(*(uint*)(&value));
+                if (BitConverter.IsLittleEndian)
+                {
+                    *(float*)(ptr + offset) = value;
+                }
+                else
+                {
+                    *(uint*)(ptr + offset) = ReverseBytes(*(uint*)(&value));
+                }
             }
         }
 
         public unsafe void PutDouble(int offset, double value)
         {
             AssertOffsetAndLength(offset, sizeof(double));
-            byte* ptr = _buffer.Buffer;
-            if (BitConverter.IsLittleEndian)
-            {
-                *(double*)(ptr + offset) = value;
-
-            }
-            else
+#if ENABLE_SPAN_T
+            fixed (byte* ptr = &MemoryMarshal.GetReference(_buffer.Span))
+#else
+            fixed (byte* ptr = _buffer.Buffer)
+#endif
             {
-                *(ulong*)(ptr + offset) = ReverseBytes(*(ulong*)(&value));
+                if (BitConverter.IsLittleEndian)
+                {
+                    *(double*)(ptr + offset) = value;
+                }
+                else
+                {
+                    *(ulong*)(ptr + offset) = ReverseBytes(*(ulong*)(&value));
+                }
             }
         }
 #else // !UNSAFE_BYTEBUFFER
@@ -613,17 +601,17 @@ namespace FlatBuffers
 
 #endif // UNSAFE_BYTEBUFFER
 
-#if UNSAFE_BYTEBUFFER
-        public unsafe sbyte GetSbyte(int index)
+#if ENABLE_SPAN_T
+        public sbyte GetSbyte(int index)
         {
             AssertOffsetAndLength(index, sizeof(sbyte));
-            return (sbyte)_buffer.Buffer[index];
+            return (sbyte)_buffer.ReadOnlySpan[index];
         }
 
-        public unsafe byte Get(int index)
+        public byte Get(int index)
         {
             AssertOffsetAndLength(index, sizeof(byte));
-            return _buffer.Buffer[index];
+            return _buffer.ReadOnlySpan[index];
         }
 #else
         public sbyte GetSbyte(int index)
@@ -642,12 +630,15 @@ namespace FlatBuffers
 #if ENABLE_SPAN_T
         public unsafe string GetStringUTF8(int startPos, int len)
         {
-            return Encoding.UTF8.GetString(_buffer.Buffer + startPos, len);
+            fixed (byte* buffer = &MemoryMarshal.GetReference(_buffer.ReadOnlySpan.Slice(startPos)))
+            {
+                return Encoding.UTF8.GetString(buffer, len);
+            }
         }
 #else
         public string GetStringUTF8(int startPos, int len)
         {
-            return Encoding.UTF8.GetString(_buffer.ByteArray, startPos, len);
+            return Encoding.UTF8.GetString(_buffer.Buffer, startPos, len);
         }
 #endif
 
@@ -661,12 +652,17 @@ namespace FlatBuffers
         public unsafe ushort GetUshort(int offset)
         {
             AssertOffsetAndLength(offset, sizeof(ushort));
-            byte* ptr = _buffer.Buffer;
+#if ENABLE_SPAN_T
+            ReadOnlySpan<byte> span = _buffer.ReadOnlySpan.Slice(offset);
+            return BinaryPrimitives.ReadUInt16LittleEndian(span);
+#else
+            fixed (byte* ptr = _buffer.Buffer)
             {
                 return BitConverter.IsLittleEndian
                     ? *(ushort*)(ptr + offset)
                     : ReverseBytes(*(ushort*)(ptr + offset));
             }
+#endif
         }
 
         public int GetInt(int offset)
@@ -677,12 +673,17 @@ namespace FlatBuffers
         public unsafe uint GetUint(int offset)
         {
             AssertOffsetAndLength(offset, sizeof(uint));
-            byte* ptr = _buffer.Buffer;
+#if ENABLE_SPAN_T
+            ReadOnlySpan<byte> span = _buffer.ReadOnlySpan.Slice(offset);
+            return BinaryPrimitives.ReadUInt32LittleEndian(span);
+#else
+            fixed (byte* ptr = _buffer.Buffer)
             {
                 return BitConverter.IsLittleEndian
                     ? *(uint*)(ptr + offset)
                     : ReverseBytes(*(uint*)(ptr + offset));
             }
+#endif
         }
 
         public long GetLong(int offset)
@@ -693,18 +694,27 @@ namespace FlatBuffers
         public unsafe ulong GetUlong(int offset)
         {
             AssertOffsetAndLength(offset, sizeof(ulong));
-            byte* ptr = _buffer.Buffer;
+#if ENABLE_SPAN_T
+            ReadOnlySpan<byte> span = _buffer.ReadOnlySpan.Slice(offset);
+            return BinaryPrimitives.ReadUInt64LittleEndian(span);
+#else            
+            fixed (byte* ptr = _buffer.Buffer)
             {
                 return BitConverter.IsLittleEndian
                     ? *(ulong*)(ptr + offset)
                     : ReverseBytes(*(ulong*)(ptr + offset));
             }
+#endif
         }
 
         public unsafe float GetFloat(int offset)
         {
             AssertOffsetAndLength(offset, sizeof(float));
-            byte* ptr = _buffer.Buffer;
+#if ENABLE_SPAN_T
+            fixed (byte* ptr = &MemoryMarshal.GetReference(_buffer.ReadOnlySpan))
+#else
+            fixed (byte* ptr = _buffer.Buffer)
+#endif
             {
                 if (BitConverter.IsLittleEndian)
                 {
@@ -721,7 +731,11 @@ namespace FlatBuffers
         public unsafe double GetDouble(int offset)
         {
             AssertOffsetAndLength(offset, sizeof(double));
-            byte* ptr = _buffer.Buffer;
+#if ENABLE_SPAN_T
+            fixed (byte* ptr = &MemoryMarshal.GetReference(_buffer.ReadOnlySpan))
+#else
+            fixed (byte* ptr = _buffer.Buffer)
+#endif
             {
                 if (BitConverter.IsLittleEndian)
                 {
@@ -758,7 +772,7 @@ namespace FlatBuffers
 
         public long GetLong(int index)
         {
-           return (long)ReadLittleEndian(index, sizeof(long));
+            return (long)ReadLittleEndian(index, sizeof(long));
         }
 
         public ulong GetUlong(int index)
@@ -819,12 +833,9 @@ namespace FlatBuffers
                 AssertOffsetAndLength(offset, numBytes);
                 // if we are LE, just do a block copy
 #if ENABLE_SPAN_T
-                unsafe
-                {
-                    MemoryMarshal.Cast<T, byte>(x).CopyTo(new Span<byte>(_buffer.Buffer, _buffer.Length).Slice(offset, numBytes));
-                }
+                MemoryMarshal.Cast<T, byte>(x).CopyTo(_buffer.Span.Slice(offset, numBytes));
 #else
-                Buffer.BlockCopy(x, 0, _buffer.ByteArray, offset, numBytes);
+                Buffer.BlockCopy(x, 0, _buffer.Buffer, offset, numBytes);
 #endif
             }
             else
@@ -841,7 +852,7 @@ namespace FlatBuffers
         }
 
 #if ENABLE_SPAN_T
-        public unsafe int Put<T>(int offset, Span<T> x)
+        public int Put<T>(int offset, Span<T> x)
             where T : struct
         {
             if (x.Length == 0)
@@ -861,7 +872,7 @@ namespace FlatBuffers
                 offset -= numBytes;
                 AssertOffsetAndLength(offset, numBytes);
                 // if we are LE, just do a block copy
-                MemoryMarshal.Cast<T, byte>(x).CopyTo(new Span<byte>(_buffer.Buffer, _buffer.Length).Slice(offset, numBytes));
+                MemoryMarshal.Cast<T, byte>(x).CopyTo(_buffer.Span.Slice(offset, numBytes));
             }
             else
             {
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs b/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
index 61c7627..6d4c8da 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
@@ -13,203 +13,41 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-using FlatBuffers;
-using System;
-using System.Buffers.Binary;
 using System.IO;
-using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 
 namespace Apache.Arrow.Ipc
 {
+    /// <summary>
+    /// Implements an <see cref="ArrowStreamReader"/> that can read Arrow files.
+    /// </summary>
     public class ArrowFileReader : ArrowStreamReader
     {
+        private ArrowFileReaderImplementation Implementation =>
+            (ArrowFileReaderImplementation)_implementation;
 
-        public bool IsFileValid { get; protected set; }
+        public bool IsFileValid => Implementation.IsFileValid;
 
-        /// <summary>
-        /// When using GetNextRecordBatch this value 
-        /// is to remember what index is next
-        /// </summary>
-        private int _recordBatchIndex;
-
-        /// <summary>
-        /// Notes what byte position where the footer data is in the stream
-        /// </summary>
-        private int _footerStartPostion;
-
-        private ArrowFooter _footer;
-
-        public async Task<int> RecordBatchCountAsync()
+        public ArrowFileReader(Stream stream)
+            : base(new ArrowFileReaderImplementation(stream))
         {
-            if (!HasReadSchema)
-            {
-                await ReadSchemaAsync();
-            }
-
-            return _footer.RecordBatchCount;
-        }
-
-        public ArrowFileReader(Stream stream) : base(stream)
-        {
-            if (!stream.CanSeek)
-            {
-                throw new ArgumentException("Stream must be seekable.", nameof(stream));
-            }
-
-            if (!stream.CanRead)
-            {
-                throw new ArgumentException("Stream must be readable.");
-            }
-
-            IsFileValid = false;
         }
 
         public static ArrowFileReader FromFile(string filename)
         {
-            var stream = new FileStream(filename, FileMode.Open);
+            var stream = new FileStream(filename, FileMode.Open, FileAccess.Read);
             return new ArrowFileReader(stream);
         }
 
-        protected override async Task<Schema> ReadSchemaAsync()
+        public Task<int> RecordBatchCountAsync()
         {
-            if (HasReadSchema)
-            {
-                return Schema;
-            }
-
-            await ValidateFileAsync();
-
-            var bytesRead = 0;
-            var footerLength = 0;
-
-            await Buffers.RentReturnAsync(4, async (buffer) =>
-            {
-                BaseStream.Position = BaseStream.Length - ArrowFileConstants.Magic.Length - 4;
-
-                bytesRead = await BaseStream.ReadAsync(buffer, 0, 4);
-                footerLength = BinaryPrimitives.ReadInt32LittleEndian(buffer);
-
-                if (bytesRead != 4) throw new InvalidDataException(
-                    $"Failed to read footer length. Read <{bytesRead}>, expected 4.");
-
-                if (footerLength <= 0) throw new InvalidDataException(
-                    $"Footer length has invalid size <{footerLength}>");
-            });
-
-            await Buffers.RentReturnAsync(footerLength, async (buffer) =>
-            {
-                _footerStartPostion = (int)BaseStream.Length - footerLength - ArrowFileConstants.Magic.Length - 4;
-
-                BaseStream.Position = _footerStartPostion;
-
-                bytesRead = await BaseStream.ReadAsync(buffer, 0, footerLength);
-
-                if (bytesRead != footerLength)
-                {
-                    throw new InvalidDataException(
-                        $"Failed to read footer. Read <{bytesRead}> bytes, expected <{footerLength}>.");
-                }
-
-                // Deserialize the footer from the footer flatbuffer
-
-                _footer = new ArrowFooter(Flatbuf.Footer.GetRootAsFooter(new ByteBuffer(buffer)));
-
-                Schema = _footer.Schema;
-            });
-
-            return Schema;
+            return Implementation.RecordBatchCountAsync();
         }
 
-        public async Task<RecordBatch> ReadRecordBatchAsync(int index, CancellationToken cancellationToken = default)
+        public Task<RecordBatch> ReadRecordBatchAsync(int index, CancellationToken cancellationToken = default)
         {
-            await ReadSchemaAsync();
-
-            if (index >= _footer.RecordBatchCount)
-            {
-                throw new ArgumentOutOfRangeException(nameof(index));
-            }
-
-            var block = _footer.GetRecordBatchBlock(index);
-
-            BaseStream.Position = block.Offset;
-
-            return await ReadRecordBatchAsync(cancellationToken);
-        }
-
-        public override async Task<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
-        {
-            await ReadSchemaAsync();
-
-            if (_recordBatchIndex >= _footer.RecordBatchCount)
-            {
-                return null;
-            }
-
-            var result = await ReadRecordBatchAsync(_recordBatchIndex, cancellationToken);
-            _recordBatchIndex++;
-
-            return result;
-        }
-
-        /// <summary>
-        /// Check if file format is valid. If it's valid don't run the validation again.
-        /// </summary>
-        private async Task ValidateFileAsync()
-        {
-            if (IsFileValid)
-            {
-                return;
-            }
-
-            await ValidateMagicAsync();
-
-            IsFileValid = true;
-        }
-
-        private async Task ValidateMagicAsync()
-        {
-            var startingPosition = BaseStream.Position;
-            var magicLength = ArrowFileConstants.Magic.Length;
-
-            try
-            {
-                await Buffers.RentReturnAsync(magicLength, async (buffer) =>
-                {
-                    // Seek to the beginning of the stream
-
-                    BaseStream.Position = 0;
-
-                    // Read beginning of stream
-
-                    await BaseStream.ReadAsync(buffer, 0, magicLength);
-
-                    if (!ArrowFileConstants.Magic.SequenceEqual(buffer.Take(magicLength)))
-                    {
-                        throw new InvalidDataException(
-                            $"Invalid magic at offset <{BaseStream.Position}>");
-                    }
-
-                    // Move stream position to magic-length bytes away from the end of the stream
-
-                    BaseStream.Position = BaseStream.Length - magicLength;
-
-                    // Read the end of the stream
-
-                    await BaseStream.ReadAsync(buffer, 0, magicLength);
-
-                    if (!ArrowFileConstants.Magic.SequenceEqual(buffer.Take(magicLength)))
-                    {
-                        throw new InvalidDataException(
-                            $"Invalid magic at offset <{BaseStream.Position}>");
-                    }
-                });
-            }
-            finally
-            {
-                BaseStream.Position = startingPosition;
-            }
+            return Implementation.ReadRecordBatchAsync(index, cancellationToken);
         }
     }
 }
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs b/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
similarity index 82%
copy from csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
copy to csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
index 61c7627..634ab53 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
@@ -23,10 +23,9 @@ using System.Threading.Tasks;
 
 namespace Apache.Arrow.Ipc
 {
-    public class ArrowFileReader : ArrowStreamReader
+    internal sealed class ArrowFileReaderImplementation : ArrowStreamReaderImplementation
     {
-
-        public bool IsFileValid { get; protected set; }
+        public bool IsFileValid { get; private set; }
 
         /// <summary>
         /// When using GetNextRecordBatch this value 
@@ -41,45 +40,28 @@ namespace Apache.Arrow.Ipc
 
         private ArrowFooter _footer;
 
-        public async Task<int> RecordBatchCountAsync()
+        public ArrowFileReaderImplementation(Stream stream) : base(stream)
         {
-            if (!HasReadSchema)
-            {
-                await ReadSchemaAsync();
-            }
-
-            return _footer.RecordBatchCount;
         }
 
-        public ArrowFileReader(Stream stream) : base(stream)
+        public async Task<int> RecordBatchCountAsync()
         {
-            if (!stream.CanSeek)
-            {
-                throw new ArgumentException("Stream must be seekable.", nameof(stream));
-            }
-
-            if (!stream.CanRead)
+            if (!HasReadSchema)
             {
-                throw new ArgumentException("Stream must be readable.");
+                await ReadSchemaAsync().ConfigureAwait(false);
             }
 
-            IsFileValid = false;
-        }
-
-        public static ArrowFileReader FromFile(string filename)
-        {
-            var stream = new FileStream(filename, FileMode.Open);
-            return new ArrowFileReader(stream);
+            return _footer.RecordBatchCount;
         }
 
-        protected override async Task<Schema> ReadSchemaAsync()
+        protected override async Task ReadSchemaAsync()
         {
             if (HasReadSchema)
             {
-                return Schema;
+                return;
             }
 
-            await ValidateFileAsync();
+            await ValidateFileAsync().ConfigureAwait(false);
 
             var bytesRead = 0;
             var footerLength = 0;
@@ -88,7 +70,7 @@ namespace Apache.Arrow.Ipc
             {
                 BaseStream.Position = BaseStream.Length - ArrowFileConstants.Magic.Length - 4;
 
-                bytesRead = await BaseStream.ReadAsync(buffer, 0, 4);
+                bytesRead = await BaseStream.ReadAsync(buffer, 0, 4).ConfigureAwait(false);
                 footerLength = BinaryPrimitives.ReadInt32LittleEndian(buffer);
 
                 if (bytesRead != 4) throw new InvalidDataException(
@@ -96,7 +78,7 @@ namespace Apache.Arrow.Ipc
 
                 if (footerLength <= 0) throw new InvalidDataException(
                     $"Footer length has invalid size <{footerLength}>");
-            });
+            }).ConfigureAwait(false);
 
             await Buffers.RentReturnAsync(footerLength, async (buffer) =>
             {
@@ -104,7 +86,7 @@ namespace Apache.Arrow.Ipc
 
                 BaseStream.Position = _footerStartPostion;
 
-                bytesRead = await BaseStream.ReadAsync(buffer, 0, footerLength);
+                bytesRead = await BaseStream.ReadAsync(buffer, 0, footerLength).ConfigureAwait(false);
 
                 if (bytesRead != footerLength)
                 {
@@ -117,14 +99,12 @@ namespace Apache.Arrow.Ipc
                 _footer = new ArrowFooter(Flatbuf.Footer.GetRootAsFooter(new ByteBuffer(buffer)));
 
                 Schema = _footer.Schema;
-            });
-
-            return Schema;
+            }).ConfigureAwait(false);
         }
 
-        public async Task<RecordBatch> ReadRecordBatchAsync(int index, CancellationToken cancellationToken = default)
+        public async Task<RecordBatch> ReadRecordBatchAsync(int index, CancellationToken cancellationToken)
         {
-            await ReadSchemaAsync();
+            await ReadSchemaAsync().ConfigureAwait(false);
 
             if (index >= _footer.RecordBatchCount)
             {
@@ -135,19 +115,19 @@ namespace Apache.Arrow.Ipc
 
             BaseStream.Position = block.Offset;
 
-            return await ReadRecordBatchAsync(cancellationToken);
+            return await ReadRecordBatchAsync(cancellationToken).ConfigureAwait(false);
         }
 
-        public override async Task<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
+        public override async Task<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken)
         {
-            await ReadSchemaAsync();
+            await ReadSchemaAsync().ConfigureAwait(false);
 
             if (_recordBatchIndex >= _footer.RecordBatchCount)
             {
                 return null;
             }
 
-            var result = await ReadRecordBatchAsync(_recordBatchIndex, cancellationToken);
+            var result = await ReadRecordBatchAsync(_recordBatchIndex, cancellationToken).ConfigureAwait(false);
             _recordBatchIndex++;
 
             return result;
@@ -163,7 +143,7 @@ namespace Apache.Arrow.Ipc
                 return;
             }
 
-            await ValidateMagicAsync();
+            await ValidateMagicAsync().ConfigureAwait(false);
 
             IsFileValid = true;
         }
@@ -183,7 +163,7 @@ namespace Apache.Arrow.Ipc
 
                     // Read beginning of stream
 
-                    await BaseStream.ReadAsync(buffer, 0, magicLength);
+                    await BaseStream.ReadAsync(buffer, 0, magicLength).ConfigureAwait(false);
 
                     if (!ArrowFileConstants.Magic.SequenceEqual(buffer.Take(magicLength)))
                     {
@@ -197,14 +177,14 @@ namespace Apache.Arrow.Ipc
 
                     // Read the end of the stream
 
-                    await BaseStream.ReadAsync(buffer, 0, magicLength);
+                    await BaseStream.ReadAsync(buffer, 0, magicLength).ConfigureAwait(false);
 
                     if (!ArrowFileConstants.Magic.SequenceEqual(buffer.Take(magicLength)))
                     {
                         throw new InvalidDataException(
                             $"Invalid magic at offset <{BaseStream.Position}>");
                     }
-                });
+                }).ConfigureAwait(false);
             }
             finally
             {
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs b/csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs
index 98fbdf0..74534f1 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs
@@ -56,13 +56,14 @@ namespace Apache.Arrow.Ipc
 
             if (!HasWrittenHeader)
             {
-                await WriteHeaderAsync(cancellationToken);
+                await WriteHeaderAsync(cancellationToken).ConfigureAwait(false);
                 HasWrittenHeader = true;
             }
 
             cancellationToken.ThrowIfCancellationRequested();
 
-            var block = await WriteRecordBatchInternalAsync(recordBatch, cancellationToken);
+            var block = await WriteRecordBatchInternalAsync(recordBatch, cancellationToken)
+                .ConfigureAwait(false);
 
             RecordBatchBlocks.Add(block);
         }
@@ -71,11 +72,11 @@ namespace Apache.Arrow.Ipc
         {
             if (!HasWrittenFooter)
             {
-                await WriteFooterAsync(Schema, cancellationToken);
+                await WriteFooterAsync(Schema, cancellationToken).ConfigureAwait(false);
                 HasWrittenFooter = true;
             }
 
-            await BaseStream.FlushAsync(cancellationToken);
+            await BaseStream.FlushAsync(cancellationToken).ConfigureAwait(false);
         }
 
         private async Task WriteHeaderAsync(CancellationToken cancellationToken)
@@ -84,8 +85,9 @@ namespace Apache.Arrow.Ipc
 
             // Write magic number and empty padding up to the 8-byte boundary
 
-            await WriteMagicAsync();
-            await WritePaddingAsync(CalculatePadding(ArrowFileConstants.Magic.Length));
+            await WriteMagicAsync().ConfigureAwait(false);
+            await WritePaddingAsync(CalculatePadding(ArrowFileConstants.Magic.Length))
+                .ConfigureAwait(false);
         }
 
         private async Task WriteFooterAsync(Schema schema, CancellationToken cancellationToken)
@@ -126,7 +128,7 @@ namespace Apache.Arrow.Ipc
 
             cancellationToken.ThrowIfCancellationRequested();
 
-            await WriteFlatBufferAsync(cancellationToken);
+            await WriteFlatBufferAsync(cancellationToken).ConfigureAwait(false);
 
             // Write footer length
 
@@ -137,14 +139,14 @@ namespace Apache.Arrow.Ipc
                 BinaryPrimitives.WriteInt32LittleEndian(buffer,
                     Convert.ToInt32(BaseStream.Position - offset));
 
-                await BaseStream.WriteAsync(buffer, 0, 4, cancellationToken);
-            });
+                await BaseStream.WriteAsync(buffer, 0, 4, cancellationToken).ConfigureAwait(false);
+            }).ConfigureAwait(false);
 
             // Write magic
 
             cancellationToken.ThrowIfCancellationRequested();
 
-            await WriteMagicAsync();
+            await WriteMagicAsync().ConfigureAwait(false);
         }
 
         private Task WriteMagicAsync()
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
new file mode 100644
index 0000000..3656c84
--- /dev/null
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Apache.Arrow.Flatbuf;
+using FlatBuffers;
+using System;
+using System.Buffers.Binary;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Ipc
+{
+    internal sealed class ArrowMemoryReaderImplementation : ArrowReaderImplementation
+    {
+        private readonly ReadOnlyMemory<byte> _buffer;
+        private int _bufferPosition;
+
+        public ArrowMemoryReaderImplementation(ReadOnlyMemory<byte> buffer)
+        {
+            _buffer = buffer;
+        }
+
+        public override Task<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken)
+        {
+            cancellationToken.ThrowIfCancellationRequested();
+            return Task.FromResult(ReadNextRecordBatch());
+        }
+
+        public override RecordBatch ReadNextRecordBatch()
+        {
+            ReadSchema();
+
+            if (_buffer.Length <= _bufferPosition + sizeof(int))
+            {
+                // reached the end
+                return null;
+            }
+
+            // Get Length of record batch for message header.
+            int messageLength = BinaryPrimitives.ReadInt32LittleEndian(_buffer.Span.Slice(_bufferPosition));
+            _bufferPosition += sizeof(int);
+
+            if (messageLength == 0)
+            {
+                //reached the end
+                return null;
+            }
+
+            Message message = Message.GetRootAsMessage(
+                CreateByteBuffer(_buffer.Slice(_bufferPosition, messageLength)));
+            _bufferPosition += messageLength;
+
+            int bodyLength = (int)message.BodyLength;
+            ByteBuffer bodybb = CreateByteBuffer(_buffer.Slice(_bufferPosition, bodyLength));
+            _bufferPosition += bodyLength;
+
+            return CreateArrowObjectFromMessage(message, bodybb);
+        }
+
+        protected override ArrowBuffer CreateArrowBuffer(ReadOnlyMemory<byte> data)
+        {
+            return new ArrowBuffer(data);
+        }
+
+        private static ByteBuffer CreateByteBuffer(ReadOnlyMemory<byte> buffer)
+        {
+            return new ByteBuffer(new ReadOnlyMemoryBufferAllocator(buffer), 0);
+        }
+
+        private void ReadSchema()
+        {
+            if (HasReadSchema)
+            {
+                return;
+            }
+
+            // Figure out length of schema
+            int schemaMessageLength = BinaryPrimitives.ReadInt32LittleEndian(_buffer.Span.Slice(_bufferPosition));
+            _bufferPosition += sizeof(int);
+
+            ByteBuffer schemaBuffer = CreateByteBuffer(_buffer.Slice(_bufferPosition));
+            Schema = MessageSerializer.GetSchema(ReadMessage<Flatbuf.Schema>(schemaBuffer));
+            _bufferPosition += schemaMessageLength;
+        }
+    }
+}
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
new file mode 100644
index 0000000..aa4f748
--- /dev/null
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using FlatBuffers;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Ipc
+{
+    internal abstract class ArrowReaderImplementation : IDisposable
+    {
+        public Schema Schema { get; protected set; }
+        protected bool HasReadSchema => Schema != null;
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected virtual void Dispose(bool disposing)
+        {
+        }
+
+        public abstract Task<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken);
+        public abstract RecordBatch ReadNextRecordBatch();
+
+        protected abstract ArrowBuffer CreateArrowBuffer(ReadOnlyMemory<byte> data);
+
+        protected static T ReadMessage<T>(ByteBuffer bb)
+            where T : struct, IFlatbufferObject
+        {
+            var returnType = typeof(T);
+            var msg = Flatbuf.Message.GetRootAsMessage(bb);
+
+            if (MatchEnum(msg.HeaderType, returnType))
+            {
+                return msg.Header<T>().Value;
+            }
+            else
+            {
+                throw new Exception($"Requested type '{returnType.Name}' " +
+                                    $"did not match type found at offset => '{msg.HeaderType}'");
+            }
+        }
+
+        private static bool MatchEnum(Flatbuf.MessageHeader messageHeader, Type flatBuffType)
+        {
+            switch (messageHeader)
+            {
+                case Flatbuf.MessageHeader.RecordBatch:
+                    return flatBuffType == typeof(Flatbuf.RecordBatch);
+                case Flatbuf.MessageHeader.DictionaryBatch:
+                    return flatBuffType == typeof(Flatbuf.DictionaryBatch);
+                case Flatbuf.MessageHeader.Schema:
+                    return flatBuffType == typeof(Flatbuf.Schema);
+                case Flatbuf.MessageHeader.Tensor:
+                    return flatBuffType == typeof(Flatbuf.Tensor);
+                case Flatbuf.MessageHeader.NONE:
+                    throw new ArgumentException("MessageHeader NONE has no matching flatbuf types", nameof(messageHeader));
+                default:
+                    throw new ArgumentException($"Unexpected MessageHeader value", nameof(messageHeader));
+            }
+        }
+
+        protected RecordBatch CreateArrowObjectFromMessage(Flatbuf.Message message, ByteBuffer bodyByteBuffer)
+        {
+            switch (message.HeaderType)
+            {
+                case Flatbuf.MessageHeader.Schema:
+                    // TODO: Read schema and verify equality?
+                    break;
+                case Flatbuf.MessageHeader.DictionaryBatch:
+                    // TODO: not supported currently
+                    Debug.WriteLine("Dictionaries are not yet supported.");
+                    break;
+                case Flatbuf.MessageHeader.RecordBatch:
+                    var rb = message.Header<Flatbuf.RecordBatch>().Value;
+                    List<IArrowArray> arrays = BuildArrays(Schema, bodyByteBuffer, rb);
+                    return new RecordBatch(Schema, arrays, (int)rb.Length);
+                default:
+                    // NOTE: Skip unsupported message type
+                    Debug.WriteLine($"Skipping unsupported message type '{message.HeaderType}'");
+                    break;
+            }
+
+            return null;
+        }
+
+        private List<IArrowArray> BuildArrays(
+            Schema schema,
+            ByteBuffer messageBuffer,
+            Flatbuf.RecordBatch recordBatchMessage)
+        {
+            var arrays = new List<IArrowArray>(recordBatchMessage.NodesLength);
+            int bufferIndex = 0;
+
+            for (var n = 0; n < recordBatchMessage.NodesLength; n++)
+            {
+                Field field = schema.GetFieldByIndex(n);
+                Flatbuf.FieldNode fieldNode = recordBatchMessage.Nodes(n).GetValueOrDefault();
+
+                ArrayData arrayData = field.DataType.IsFixedPrimitive() ?
+                    LoadPrimitiveField(field, fieldNode, recordBatchMessage, messageBuffer, ref bufferIndex) :
+                    LoadVariableField(field, fieldNode, recordBatchMessage, messageBuffer, ref bufferIndex);
+
+                arrays.Add(ArrowArrayFactory.BuildArray(arrayData));
+            }
+
+            return arrays;
+        }
+
+        private ArrayData LoadPrimitiveField(
+            Field field,
+            Flatbuf.FieldNode fieldNode,
+            Flatbuf.RecordBatch recordBatch,
+            ByteBuffer bodyData,
+            ref int bufferIndex)
+        {
+            var nullBitmapBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
+            var valueBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
+
+            ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, nullBitmapBuffer);
+            ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, valueBuffer);
+
+            var fieldLength = (int)fieldNode.Length;
+            var fieldNullCount = (int)fieldNode.NullCount;
+
+            if (fieldLength < 0)
+            {
+                throw new InvalidDataException("Field length must be >= 0"); // TODO:Localize exception message
+            }
+
+            if (fieldNullCount < 0)
+            {
+                throw new InvalidDataException("Null count length must be >= 0"); // TODO:Localize exception message
+            }
+
+            var arrowBuff = new[] { nullArrowBuffer, valueArrowBuffer };
+
+            return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff);
+        }
+
+        private ArrayData LoadVariableField(
+            Field field,
+            Flatbuf.FieldNode fieldNode,
+            Flatbuf.RecordBatch recordBatch,
+            ByteBuffer bodyData,
+            ref int bufferIndex)
+        {
+            var nullBitmapBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
+            var offsetBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
+            var valueBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
+
+            ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, nullBitmapBuffer);
+            ArrowBuffer offsetArrowBuffer = BuildArrowBuffer(bodyData, offsetBuffer);
+            ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, valueBuffer);
+
+            var fieldLength = (int)fieldNode.Length;
+            var fieldNullCount = (int)fieldNode.NullCount;
+
+            if (fieldLength < 0)
+            {
+                throw new InvalidDataException("Field length must be >= 0"); // TODO: Localize exception message
+            }
+
+            if (fieldNullCount < 0)
+            {
+                throw new InvalidDataException("Null count length must be >= 0"); //TODO: Localize exception message
+            }
+
+            var arrowBuff = new[] { nullArrowBuffer, offsetArrowBuffer, valueArrowBuffer };
+
+            return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff);
+        }
+
+        private ArrowBuffer BuildArrowBuffer(ByteBuffer bodyData, Flatbuf.Buffer buffer)
+        {
+            if (buffer.Length <= 0)
+            {
+                return ArrowBuffer.Empty;
+            }
+
+            int offset = (int)buffer.Offset;
+            int length = (int)buffer.Length;
+
+            var data = bodyData.ToReadOnlyMemory(offset, length);
+            return CreateArrowBuffer(data);
+        }
+    }
+}
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
index c97b7cf..b297128 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
@@ -14,282 +14,61 @@
 // limitations under the License.
 
 using System;
-using System.Buffers;
-using System.Collections.Generic;
-using System.Diagnostics;
 using System.IO;
-using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 
 namespace Apache.Arrow.Ipc
 {
+    /// <summary>
+    /// Represents a reader that can read Arrow streams.
+    /// </summary>
     public class ArrowStreamReader : IArrowReader, IDisposable
     {
-        public Schema Schema { get; protected set; }
-        public Stream BaseStream { get; }
+        private protected readonly ArrowReaderImplementation _implementation;
 
-        protected ArrayPool<byte> Buffers { get; }
+        public Schema Schema => _implementation.Schema;
 
         public ArrowStreamReader(Stream stream)
         {
-            BaseStream = stream ?? throw new ArgumentNullException(nameof(stream));
-            Buffers = ArrayPool<byte>.Create();
-            Schema = null;
-        }
-
-        protected bool HasReadSchema => Schema != null;
+            if (stream == null)
+                throw new ArgumentNullException(nameof(stream));
 
-        public virtual async Task<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
-        {
-            // TODO: Loop until a record batch is read.
-            cancellationToken.ThrowIfCancellationRequested();
-            return await ReadRecordBatchAsync(cancellationToken);
+            _implementation = new ArrowStreamReaderImplementation(stream);
         }
 
-        protected async Task<RecordBatch> ReadRecordBatchAsync(CancellationToken cancellationToken = default)
+        public ArrowStreamReader(ReadOnlyMemory<byte> buffer)
         {
-            await ReadSchemaAsync();
-
-            var bytesRead = 0;
-
-            byte[] lengthBuffer = null;
-            byte[] messageBuff = null;
-            byte[] bodyBuff = null;
-
-            try
-            {
-                // Get Length of record batch for message header.
-
-                lengthBuffer = Buffers.Rent(4);
-                bytesRead += await BaseStream.ReadAsync(lengthBuffer, 0, 4, cancellationToken);
-                var messageLength = BitConverter.ToInt32(lengthBuffer, 0);
-
-                messageBuff = Buffers.Rent(messageLength);
-                bytesRead += await BaseStream.ReadAsync(messageBuff, 0, messageLength, cancellationToken);
-                var message = Flatbuf.Message.GetRootAsMessage(new FlatBuffers.ByteBuffer(messageBuff));
-
-                bodyBuff = Buffers.Rent((int)message.BodyLength);
-                var bodybb = new FlatBuffers.ByteBuffer(bodyBuff);
-                bytesRead += await BaseStream.ReadAsync(bodyBuff, 0, (int)message.BodyLength, cancellationToken);
-
-                switch (message.HeaderType)
-                {
-                    case Flatbuf.MessageHeader.Schema:
-                        // TODO: Read schema and verify equality?
-                        break;
-                    case Flatbuf.MessageHeader.DictionaryBatch:
-                        // TODO: not supported currently
-                        Debug.WriteLine("Dictionaries are not yet supported.");
-                        break;
-                    case Flatbuf.MessageHeader.RecordBatch:
-                        var rb = message.Header<Flatbuf.RecordBatch>().Value;
-                        var arrays = BuildArrays(Schema, bodybb, rb);
-                        return new RecordBatch(Schema, arrays, (int)rb.Length);
-                    default:
-                        // NOTE: Skip unsupported message type
-                        Debug.WriteLine($"Skipping unsupported message type '{message.HeaderType}'");
-                        break;
-                }
-            }
-            finally
-            {
-                if (lengthBuffer != null)
-                {
-                    Buffers.Return(lengthBuffer);
-                }
-
-                if (messageBuff != null)
-                {
-                    Buffers.Return(messageBuff);
-                }
-
-                if (bodyBuff != null)
-                {
-                    Buffers.Return(bodyBuff);
-                }
-            }
-
-            return null;
+            _implementation = new ArrowMemoryReaderImplementation(buffer);
         }
 
-        protected virtual async Task<Schema> ReadSchemaAsync()
+        private protected ArrowStreamReader(ArrowReaderImplementation implementation)
         {
-            if (HasReadSchema)
-            {
-                return Schema;
-            }
-
-            byte[] buff = null;
-
-            try
-            {
-                // Figure out length of schema
-
-                buff = Buffers.Rent(4);
-                await BaseStream.ReadAsync(buff, 0, 4);
-                var schemaMessageLength = BitConverter.ToInt32(buff, 0);
-                Buffers.Return(buff);
-
-                // Allocate byte array for schema flat buffer
-
-                buff = Buffers.Rent(schemaMessageLength);
-                var schemabb = new FlatBuffers.ByteBuffer(buff);
-
-                // Read in schema
-
-                await BaseStream.ReadAsync(buff, 0, schemaMessageLength);
-                Schema = MessageSerializer.GetSchema(ReadMessage<Flatbuf.Schema>(schemabb));
-
-                return Schema;
-            }
-            finally
-            {
-                if (buff != null)
-                {
-                    Buffers.Return(buff);
-                }
-            }
+            _implementation = implementation;
         }
 
         public void Dispose()
         {
-            BaseStream.Dispose();
+            Dispose(true);
+            GC.SuppressFinalize(this);
         }
 
-        #region Static Helper Functions
-
-        private static IEnumerable<IArrowArray> BuildArrays(Schema schema,
-            FlatBuffers.ByteBuffer messageBuffer,
-            Flatbuf.RecordBatch recordBatchMessage)
+        protected virtual void Dispose(bool disposing)
         {
-            var arrays = new List<ArrayData>();
-            var bufferIndex = 0;
-
-            for (var n = 0; n < recordBatchMessage.NodesLength; n++)
+            if (disposing)
             {
-                var field = schema.GetFieldByIndex(n);
-                var fieldNode = recordBatchMessage.Nodes(n).GetValueOrDefault();
-
-                if (field.DataType.IsFixedPrimitive())
-                    arrays.Add(LoadPrimitiveField(field, fieldNode, recordBatchMessage, messageBuffer, ref bufferIndex));
-                else
-                    arrays.Add(LoadVariableField(field, fieldNode, recordBatchMessage, messageBuffer, ref bufferIndex));
+                _implementation.Dispose();
             }
-
-            return arrays.Select(ArrowArrayFactory.BuildArray);
         }
 
-        private static T ReadMessage<T>(FlatBuffers.ByteBuffer bb) where T : struct, FlatBuffers.IFlatbufferObject
+        public Task<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
         {
-            var returnType = typeof(T);
-            var msg = Flatbuf.Message.GetRootAsMessage(bb);
-
-            if (MatchEnum(msg.HeaderType, returnType))
-            {
-                return msg.Header<T>().Value;
-            }
-            else
-            {
-                throw new Exception($"Requested type '{returnType.Name}' " +
-                                    $"did not match type found at offset => '{msg.HeaderType}'");
-            }
-        }
-
-        private static bool MatchEnum(Flatbuf.MessageHeader messageHeader, Type flatBuffType)
-        {
-            switch (messageHeader)
-            {
-                case Flatbuf.MessageHeader.RecordBatch:
-                    return flatBuffType == typeof(Flatbuf.RecordBatch);
-                case Flatbuf.MessageHeader.DictionaryBatch:
-                    return flatBuffType == typeof(Flatbuf.DictionaryBatch);
-                case Flatbuf.MessageHeader.Schema:
-                    return flatBuffType == typeof(Flatbuf.Schema);
-                case Flatbuf.MessageHeader.Tensor:
-                    return flatBuffType == typeof(Flatbuf.Tensor);
-                case Flatbuf.MessageHeader.NONE:
-                    throw new ArgumentException("MessageHeader NONE has no matching flatbuf types", nameof(messageHeader));
-                default:
-                    throw new ArgumentException($"Unexpected MessageHeader value", nameof(messageHeader));
-            }
-        }
-
-        private static ArrowBuffer BuildArrowBuffer(FlatBuffers.ByteBuffer bodyData, Flatbuf.Buffer buffer)
-        {
-            if (buffer.Length <= 0)
-            {
-                return ArrowBuffer.Empty;
-            }
-
-            var segment = bodyData.ToArraySegment((int)buffer.Offset, (int)buffer.Length);
-
-            return new ArrowBuffer.Builder<byte>(segment.Count)
-                .Append(segment)
-                .Build();
-        }
-
-        private static ArrayData LoadPrimitiveField(Field field,
-                                  Flatbuf.FieldNode fieldNode,
-                                  Flatbuf.RecordBatch recordBatch,
-                                  FlatBuffers.ByteBuffer bodyData,
-                                  ref int bufferIndex)
-        {
-            var nullBitmapBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
-            var valueBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
-
-            ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, nullBitmapBuffer);
-            ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, valueBuffer);
-            
-            var fieldLength = (int)fieldNode.Length;
-            var fieldNullCount = (int)fieldNode.NullCount;
-
-            if (fieldLength < 0)
-            {
-                throw new InvalidDataException("Field length must be >= 0"); // TODO:Localize exception message
-            }
-
-            if (fieldNullCount < 0)
-            {
-                throw new InvalidDataException("Null count length must be >= 0"); // TODO:Localize exception message
-            }
-
-            var arrowBuff = new[] { nullArrowBuffer, valueArrowBuffer };
-
-            return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff);
+            return _implementation.ReadNextRecordBatchAsync(cancellationToken);
         }
 
-        private static ArrayData LoadVariableField(Field field,
-                                  Flatbuf.FieldNode fieldNode,
-                          Flatbuf.RecordBatch recordBatch,
-                          FlatBuffers.ByteBuffer bodyData,
-                          ref int bufferIndex)
+        public RecordBatch ReadNextRecordBatch()
         {
-            var nullBitmapBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
-            var offsetBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
-            var valueBuffer = recordBatch.Buffers(bufferIndex++).GetValueOrDefault();
-
-            ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, nullBitmapBuffer);
-            ArrowBuffer offsetArrowBuffer = BuildArrowBuffer(bodyData, offsetBuffer);
-            ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, valueBuffer);
-
-            var fieldLength = (int)fieldNode.Length;
-            var fieldNullCount = (int)fieldNode.NullCount;
-
-            if (fieldLength < 0)
-            {
-                throw new InvalidDataException("Field length must be >= 0"); // TODO: Localize exception message
-            }
-
-            if (fieldNullCount < 0)
-            { 
-                throw new InvalidDataException("Null count length must be >= 0"); //TODO: Localize exception message
-            }
-
-            var arrowBuff = new[] { nullArrowBuffer, offsetArrowBuffer, valueArrowBuffer };
-
-            return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff);
+            return _implementation.ReadNextRecordBatch();
         }
-        #endregion
     }
 }
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
new file mode 100644
index 0000000..b07521e
--- /dev/null
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Buffers;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Ipc
+{
+    internal class ArrowStreamReaderImplementation : ArrowReaderImplementation
+    {
+        public Stream BaseStream { get; }
+        protected ArrayPool<byte> Buffers { get; }
+
+        public ArrowStreamReaderImplementation(Stream stream)
+        {
+            BaseStream = stream;
+            Buffers = ArrayPool<byte>.Create();
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            if (disposing)
+            {
+                BaseStream.Dispose();
+            }
+        }
+
+        public override async Task<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken)
+        {
+            // TODO: Loop until a record batch is read.
+            cancellationToken.ThrowIfCancellationRequested();
+            return await ReadRecordBatchAsync(cancellationToken).ConfigureAwait(false);
+        }
+
+        public override RecordBatch ReadNextRecordBatch()
+        {
+            throw new NotImplementedException();
+        }
+
+        protected async Task<RecordBatch> ReadRecordBatchAsync(CancellationToken cancellationToken = default)
+        {
+            await ReadSchemaAsync().ConfigureAwait(false);
+
+            var bytesRead = 0;
+
+            byte[] lengthBuffer = null;
+            byte[] messageBuff = null;
+            byte[] bodyBuff = null;
+
+            try
+            {
+                // Get Length of record batch for message header.
+
+                lengthBuffer = Buffers.Rent(4);
+                bytesRead += await BaseStream.ReadAsync(lengthBuffer, 0, 4, cancellationToken)
+                    .ConfigureAwait(false);
+
+                if (bytesRead != 4)
+                {
+                    //reached the end
+                    return null;
+                }
+
+                var messageLength = BitConverter.ToInt32(lengthBuffer, 0);
+
+                if (messageLength == 0)
+                {
+                    //reached the end
+                    return null;
+                }
+
+                messageBuff = Buffers.Rent(messageLength);
+                bytesRead += await BaseStream.ReadAsync(messageBuff, 0, messageLength, cancellationToken)
+                    .ConfigureAwait(false);
+                var message = Flatbuf.Message.GetRootAsMessage(new FlatBuffers.ByteBuffer(messageBuff));
+
+                bodyBuff = Buffers.Rent((int)message.BodyLength);
+                var bodybb = new FlatBuffers.ByteBuffer(bodyBuff);
+                bytesRead += await BaseStream.ReadAsync(bodyBuff, 0, (int)message.BodyLength, cancellationToken)
+                    .ConfigureAwait(false);
+
+                return CreateArrowObjectFromMessage(message, bodybb);
+            }
+            finally
+            {
+                if (lengthBuffer != null)
+                {
+                    Buffers.Return(lengthBuffer);
+                }
+
+                if (messageBuff != null)
+                {
+                    Buffers.Return(messageBuff);
+                }
+
+                if (bodyBuff != null)
+                {
+                    Buffers.Return(bodyBuff);
+                }
+            }
+        }
+
+        protected virtual async Task ReadSchemaAsync()
+        {
+            if (HasReadSchema)
+            {
+                return;
+            }
+
+            byte[] buff = null;
+
+            try
+            {
+                // Figure out length of schema
+
+                buff = Buffers.Rent(4);
+                await BaseStream.ReadAsync(buff, 0, 4).ConfigureAwait(false);
+                var schemaMessageLength = BitConverter.ToInt32(buff, 0);
+                Buffers.Return(buff);
+
+                // Allocate byte array for schema flat buffer
+
+                buff = Buffers.Rent(schemaMessageLength);
+                var schemabb = new FlatBuffers.ByteBuffer(buff);
+
+                // Read in schema
+
+                await BaseStream.ReadAsync(buff, 0, schemaMessageLength).ConfigureAwait(false);
+                Schema = MessageSerializer.GetSchema(ReadMessage<Flatbuf.Schema>(schemabb));
+            }
+            finally
+            {
+                if (buff != null)
+                {
+                    Buffers.Return(buff);
+                }
+            }
+        }
+
+        protected override ArrowBuffer CreateArrowBuffer(ReadOnlyMemory<byte> data)
+        {
+            // need to use the Buffer.Builder because we are currently renting the memory to
+            // read messages
+            return new ArrowBuffer.Builder<byte>(data.Length)
+                .Append(data.Span)
+                .Build();
+        }
+    }
+}
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
index 6105e92..be768c1 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
@@ -174,7 +174,7 @@ namespace Apache.Arrow.Ipc
 
             if (!HasWrittenSchema)
             {
-                await WriteSchemaAsync(Schema, cancellationToken);
+                await WriteSchemaAsync(Schema, cancellationToken).ConfigureAwait(false);
                 HasWrittenSchema = true;
             }
 
@@ -229,7 +229,7 @@ namespace Apache.Arrow.Ipc
 
             await WriteMessageAsync(Flatbuf.MessageHeader.RecordBatch,
                 recordBatchOffset, recordBatchBuilder.TotalLength,
-                cancellationToken);
+                cancellationToken).ConfigureAwait(false);
 
             var metadataLength = BaseStream.Position - metadataOffset;
 
@@ -243,7 +243,7 @@ namespace Apache.Arrow.Ipc
                     continue;
 
                 
-                await WriteBufferAsync(buffers[i].DataBuffer, cancellationToken);
+                await WriteBufferAsync(buffers[i].DataBuffer, cancellationToken).ConfigureAwait(false);
             }
 
             // Write padding so the record batch message body length is a multiple of 8 bytes
@@ -251,7 +251,7 @@ namespace Apache.Arrow.Ipc
             var bodyLength = Convert.ToInt32(BaseStream.Position - lengthOffset);
             var bodyPaddingLength = CalculatePadding(bodyLength);
 
-            await WritePaddingAsync(bodyPaddingLength);
+            await WritePaddingAsync(bodyPaddingLength).ConfigureAwait(false);
 
             return new Block(
                 offset: Convert.ToInt32(metadataOffset),
@@ -272,7 +272,7 @@ namespace Apache.Arrow.Ipc
                 var span = arrowBuffer.Span;
                 buffer = ArrayPool<byte>.Shared.Rent(span.Length);
                 span.CopyTo(buffer);
-                return BaseStream.WriteAsync(buffer, 0, buffer.Length, cancellationToken);
+                return BaseStream.WriteAsync(buffer, 0, span.Length, cancellationToken);
             }
             finally
             {
@@ -323,7 +323,8 @@ namespace Apache.Arrow.Ipc
 
             // Build message
 
-            await WriteMessageAsync(Flatbuf.MessageHeader.Schema, schemaOffset, 0, cancellationToken);
+            await WriteMessageAsync(Flatbuf.MessageHeader.Schema, schemaOffset, 0, cancellationToken)
+                .ConfigureAwait(false);
 
             return schemaOffset;
         }
@@ -339,25 +340,25 @@ namespace Apache.Arrow.Ipc
 
             Builder.Finish(messageOffset.Value);
 
-            var messageData = Builder.DataBuffer.ToArraySegment(Builder.DataBuffer.Position, Builder.Offset);
-            var messagePaddingLength = CalculatePadding(messageData.Count);
+            var messageData = Builder.DataBuffer.ToReadOnlyMemory(Builder.DataBuffer.Position, Builder.Offset);
+            var messagePaddingLength = CalculatePadding(messageData.Length);
 
             await Buffers.RentReturnAsync(4, (buffer) =>
             {
-                var metadataSize = messageData.Count + messagePaddingLength;
+                var metadataSize = messageData.Length + messagePaddingLength;
                 BinaryPrimitives.WriteInt32LittleEndian(buffer, metadataSize);
                 return BaseStream.WriteAsync(buffer, 0, 4, cancellationToken);
-            });
+            }).ConfigureAwait(false);
 
-            await BaseStream.WriteAsync(messageData.Array, messageData.Offset, messageData.Count, cancellationToken);
-            await WritePaddingAsync(messagePaddingLength);
+            await BaseStream.WriteAsync(messageData, cancellationToken).ConfigureAwait(false);
+            await WritePaddingAsync(messagePaddingLength).ConfigureAwait(false);
         }
 
         private protected async Task WriteFlatBufferAsync(CancellationToken cancellationToken = default)
         {
-            var segment = Builder.DataBuffer.ToArraySegment(Builder.DataBuffer.Position, Builder.Offset);
+            var segment = Builder.DataBuffer.ToReadOnlyMemory(Builder.DataBuffer.Position, Builder.Offset);
 
-            await BaseStream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken);
+            await BaseStream.WriteAsync(segment, cancellationToken).ConfigureAwait(false);
         }
 
         protected int CalculatePadding(int offset, int alignment = 8) =>
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowTypeFlatbufferBuilder.cs b/csharp/src/Apache.Arrow/Ipc/ArrowTypeFlatbufferBuilder.cs
index fe4598a..c1902d7 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowTypeFlatbufferBuilder.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowTypeFlatbufferBuilder.cs
@@ -41,6 +41,7 @@ namespace Apache.Arrow.Ipc
         }
 
         class TypeVisitor : 
+            IArrowTypeVisitor<BooleanType>,
             IArrowTypeVisitor<Int8Type>,
             IArrowTypeVisitor<Int16Type>,
             IArrowTypeVisitor<Int32Type>,
diff --git a/csharp/src/Apache.Arrow/Ipc/ReadOnlyMemoryBufferAllocator.cs b/csharp/src/Apache.Arrow/Ipc/ReadOnlyMemoryBufferAllocator.cs
new file mode 100644
index 0000000..7e78fcf
--- /dev/null
+++ b/csharp/src/Apache.Arrow/Ipc/ReadOnlyMemoryBufferAllocator.cs
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using FlatBuffers;
+using System;
+
+namespace Apache.Arrow.Ipc
+{
+    internal sealed class ReadOnlyMemoryBufferAllocator : ByteBufferAllocator
+    {
+        private readonly ReadOnlyMemory<byte> _buffer;
+
+        public ReadOnlyMemoryBufferAllocator(ReadOnlyMemory<byte> buffer)
+        {
+            _buffer = buffer;
+            Length = buffer.Length;
+        }
+
+        public override ReadOnlySpan<byte> ReadOnlySpan => _buffer.Span;
+        public override ReadOnlyMemory<byte> ReadOnlyMemory => _buffer;
+
+        // since this is read-only, the following are not supported
+        public override Memory<byte> Memory => throw new NotSupportedException();
+        public override Span<byte> Span => throw new NotSupportedException();
+        public override void GrowFront(int newSize) => throw new NotSupportedException();
+    }
+}
diff --git a/csharp/src/Apache.Arrow/RecordBatch.cs b/csharp/src/Apache.Arrow/RecordBatch.cs
index d5a21eb..297862f 100644
--- a/csharp/src/Apache.Arrow/RecordBatch.cs
+++ b/csharp/src/Apache.Arrow/RecordBatch.cs
@@ -15,6 +15,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Diagnostics;
 using System.Linq;
 
 namespace Apache.Arrow
@@ -51,5 +52,16 @@ namespace Apache.Arrow
             Schema = schema ?? throw new ArgumentNullException(nameof(schema));
             Length = length;
         }
+
+        internal RecordBatch(Schema schema, List<IArrowArray> arrays, int length)
+        {
+            Debug.Assert(schema != null);
+            Debug.Assert(arrays != null);
+            Debug.Assert(length >= 0);
+
+            _arrays = arrays;
+            Schema = schema;
+            Length = length;
+        }
     }
 }
diff --git a/csharp/test/Apache.Arrow.Benchmarks/Apache.Arrow.Benchmarks.csproj b/csharp/test/Apache.Arrow.Benchmarks/Apache.Arrow.Benchmarks.csproj
new file mode 100644
index 0000000..f74af09
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Benchmarks/Apache.Arrow.Benchmarks.csproj
@@ -0,0 +1,19 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <OutputType>Exe</OutputType>
+    <TargetFramework>netcoreapp2.1</TargetFramework>
+    <LangVersion>latest</LangVersion>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <PackageReference Include="BenchmarkDotNet" Version="0.11.3" />
+    <PackageReference Include="BenchmarkDotNet.Diagnostics.Windows" Version="0.11.3" />
+  </ItemGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="..\..\src\Apache.Arrow\Apache.Arrow.csproj" />
+    <ProjectReference Include="..\Apache.Arrow.Tests\Apache.Arrow.Tests.csproj" />
+  </ItemGroup>
+
+</Project>
diff --git a/csharp/test/Apache.Arrow.Benchmarks/ArrowReaderBenchmark.cs b/csharp/test/Apache.Arrow.Benchmarks/ArrowReaderBenchmark.cs
new file mode 100644
index 0000000..1e3aecf
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Benchmarks/ArrowReaderBenchmark.cs
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Tests;
+using Apache.Arrow.Types;
+using BenchmarkDotNet.Attributes;
+using System;
+using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Benchmarks
+{
+    //[EtwProfiler] - needs elevated privileges
+    [MemoryDiagnoser]
+    public class ArrowReaderBenchmark
+    {
+        private MemoryStream _memoryStream;
+
+        [GlobalSetup]
+        public async Task GlobalSetup()
+        {
+            RecordBatch batch = TestData.CreateSampleRecordBatch(length: 1_000_000);
+            _memoryStream = new MemoryStream();
+
+            ArrowStreamWriter writer = new ArrowStreamWriter(_memoryStream, batch.Schema);
+            await writer.WriteRecordBatchAsync(batch);
+        }
+
+        [IterationSetup]
+        public void Setup()
+        {
+            _memoryStream.Position = 0;
+        }
+
+        [Benchmark]
+        public async Task<double> ArrowReaderWithMemoryStream()
+        {
+            double sum = 0;
+            var reader = new ArrowStreamReader(_memoryStream);
+            RecordBatch recordBatch;
+            while ((recordBatch = await reader.ReadNextRecordBatchAsync()) != null)
+            {
+                sum += SumAllNumbers(recordBatch);
+            }
+            return sum;
+        }
+
+        [Benchmark]
+        public async Task<double> ArrowReaderWithMemory()
+        {
+            double sum = 0;
+            var reader = new ArrowStreamReader(_memoryStream.GetBuffer());
+            RecordBatch recordBatch;
+            while ((recordBatch = await reader.ReadNextRecordBatchAsync()) != null)
+            {
+                sum += SumAllNumbers(recordBatch);
+            }
+            return sum;
+        }
+
+        private static double SumAllNumbers(RecordBatch recordBatch)
+        {
+            double sum = 0;
+
+            for (int k = 0; k < recordBatch.ColumnCount; k++)
+            {
+                var array = recordBatch.Arrays.ElementAt(k);
+                switch (recordBatch.Schema.GetFieldByIndex(k).DataType.TypeId)
+                {
+                    case ArrowTypeId.Int64:
+                        Int64Array int64Array = (Int64Array)array;
+                        sum += Sum(int64Array);
+                        break;
+                    case ArrowTypeId.Double:
+                        DoubleArray doubleArray = (DoubleArray)array;
+                        sum += Sum(doubleArray);
+                        break;
+                }
+            }
+            return sum;
+        }
+
+        private static double Sum(DoubleArray doubleArray)
+        {
+            double sum = 0;
+            ReadOnlySpan<double> values = doubleArray.Values;
+            for (int valueIndex = 0; valueIndex < values.Length; valueIndex++)
+            {
+                sum += values[valueIndex];
+            }
+            return sum;
+        }
+
+        private static long Sum(Int64Array int64Array)
+        {
+            long sum = 0;
+            ReadOnlySpan<long> values = int64Array.Values;
+            for (int valueIndex = 0; valueIndex < values.Length; valueIndex++)
+            {
+                sum += values[valueIndex];
+            }
+            return sum;
+        }
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Benchmarks/Program.cs b/csharp/test/Apache.Arrow.Benchmarks/Program.cs
new file mode 100644
index 0000000..0f1410f
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Benchmarks/Program.cs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using BenchmarkDotNet.Running;
+
+namespace Apache.Arrow.Benchmarks
+{
+    public static class Program
+    {
+        public static void Main(string[] args)
+        {
+            BenchmarkSwitcher
+                .FromAssembly(typeof(Program).Assembly)
+                .Run(args);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
new file mode 100644
index 0000000..3bd4c22
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Apache.Arrow.Ipc;
+using System;
+using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace Apache.Arrow.Tests
+{
+    public class ArrowStreamReaderTests
+    {
+        [Fact]
+        public async Task ReadRecordBatch()
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+
+            byte[] buffer;
+            using (MemoryStream stream = new MemoryStream())
+            {
+                ArrowStreamWriter writer = new ArrowStreamWriter(stream, originalBatch.Schema);
+                await writer.WriteRecordBatchAsync(originalBatch);
+                buffer = stream.GetBuffer();
+            }
+
+            ArrowStreamReader reader = new ArrowStreamReader(buffer);
+            RecordBatch readBatch = reader.ReadNextRecordBatch();
+            CompareBatches(originalBatch, readBatch);
+
+            // There should only be one batch - calling ReadNextRecordBatch again should return null.
+            Assert.Null(reader.ReadNextRecordBatch());
+            Assert.Null(reader.ReadNextRecordBatch());
+        }
+
+        private void CompareBatches(RecordBatch expectedBatch, RecordBatch actualBatch)
+        {
+            CompareSchemas(expectedBatch.Schema, actualBatch.Schema);
+            Assert.Equal(expectedBatch.Length, actualBatch.Length);
+            Assert.Equal(expectedBatch.ColumnCount, actualBatch.ColumnCount);
+
+            for (int i = 0; i < expectedBatch.ColumnCount; i++)
+            {
+                IArrowArray expectedArray = expectedBatch.Arrays.ElementAt(i);
+                IArrowArray actualArray = actualBatch.Arrays.ElementAt(i);
+
+                actualArray.Accept(new ArrayComparer(expectedArray));
+            }
+        }
+
+        private void CompareSchemas(Schema expectedSchema, Schema actualSchema)
+        {
+            Assert.Equal(expectedSchema.Fields.Count, actualSchema.Fields.Count);
+            // TODO: compare fields once https://github.com/apache/arrow/pull/3662 is in
+        }
+
+        private class ArrayComparer :
+            IArrowArrayVisitor<Int8Array>,
+            IArrowArrayVisitor<Int16Array>,
+            IArrowArrayVisitor<Int32Array>,
+            IArrowArrayVisitor<Int64Array>,
+            IArrowArrayVisitor<UInt8Array>,
+            IArrowArrayVisitor<UInt16Array>,
+            IArrowArrayVisitor<UInt32Array>,
+            IArrowArrayVisitor<UInt64Array>,
+            IArrowArrayVisitor<FloatArray>,
+            IArrowArrayVisitor<DoubleArray>,
+            IArrowArrayVisitor<BooleanArray>,
+            IArrowArrayVisitor<TimestampArray>,
+            IArrowArrayVisitor<Date32Array>,
+            IArrowArrayVisitor<Date64Array>,
+            IArrowArrayVisitor<ListArray>,
+            IArrowArrayVisitor<StringArray>,
+            IArrowArrayVisitor<BinaryArray>
+        {
+            private readonly IArrowArray _expectedArray;
+
+            public ArrayComparer(IArrowArray expectedArray)
+            {
+                _expectedArray = expectedArray;
+            }
+
+            public void Visit(Int8Array array) => CompareArrays(array);
+            public void Visit(Int16Array array) => CompareArrays(array);
+            public void Visit(Int32Array array) => CompareArrays(array);
+            public void Visit(Int64Array array) => CompareArrays(array);
+            public void Visit(UInt8Array array) => CompareArrays(array);
+            public void Visit(UInt16Array array) => CompareArrays(array);
+            public void Visit(UInt32Array array) => CompareArrays(array);
+            public void Visit(UInt64Array array) => CompareArrays(array);
+            public void Visit(FloatArray array) => CompareArrays(array);
+            public void Visit(DoubleArray array) => CompareArrays(array);
+            public void Visit(BooleanArray array) => CompareArrays(array);
+            public void Visit(TimestampArray array) => CompareArrays(array);
+            public void Visit(Date32Array array) => CompareArrays(array);
+            public void Visit(Date64Array array) => CompareArrays(array);
+            public void Visit(ListArray array) => throw new NotImplementedException();
+            public void Visit(StringArray array) => throw new NotImplementedException();
+            public void Visit(BinaryArray array) => throw new NotImplementedException();
+            public void Visit(IArrowArray array) => throw new NotImplementedException();
+
+            private void CompareArrays<T>(PrimitiveArray<T> actualArray)
+                where T : struct, IEquatable<T>
+            {
+                Assert.IsAssignableFrom<PrimitiveArray<T>>(_expectedArray);
+                PrimitiveArray<T> expectedArray = (PrimitiveArray<T>)_expectedArray;
+
+                Assert.Equal(expectedArray.Length, actualArray.Length);
+                Assert.Equal(expectedArray.NullCount, actualArray.NullCount);
+                Assert.Equal(expectedArray.Offset, actualArray.Offset);
+
+                Assert.True(expectedArray.NullBitmapBuffer.Span.SequenceEqual(actualArray.NullBitmapBuffer.Span));
+                Assert.True(expectedArray.Values.Slice(0, expectedArray.Length).SequenceEqual(actualArray.Values.Slice(0, actualArray.Length)));
+            }
+        }
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Tests/TestData.cs b/csharp/test/Apache.Arrow.Tests/TestData.cs
new file mode 100644
index 0000000..1bc046d
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Tests/TestData.cs
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Apache.Arrow.Types;
+using System;
+using System.Collections.Generic;
+
+namespace Apache.Arrow.Tests
+{
+    public static class TestData
+    {
+        public static RecordBatch CreateSampleRecordBatch(int length)
+        {
+            Schema.Builder builder = new Schema.Builder();
+            builder.Field(CreateField(BooleanType.Default));
+            builder.Field(CreateField(UInt8Type.Default));
+            builder.Field(CreateField(Int8Type.Default));
+            builder.Field(CreateField(UInt16Type.Default));
+            builder.Field(CreateField(Int16Type.Default));
+            builder.Field(CreateField(UInt32Type.Default));
+            builder.Field(CreateField(Int32Type.Default));
+            builder.Field(CreateField(UInt64Type.Default));
+            builder.Field(CreateField(Int64Type.Default));
+            builder.Field(CreateField(FloatType.Default));
+            builder.Field(CreateField(DoubleType.Default));
+            //builder.Field(CreateField(new DecimalType(19, 2)));
+            //builder.Field(CreateField(HalfFloatType.Default));
+            //builder.Field(CreateField(StringType.Default));
+            //builder.Field(CreateField(Date32Type.Default));
+            //builder.Field(CreateField(Date64Type.Default));
+            //builder.Field(CreateField(Time32Type.Default));
+            //builder.Field(CreateField(Time64Type.Default));
+            //builder.Field(CreateField(TimestampType.Default));
+
+            Schema schema = builder.Build();
+
+            IEnumerable<IArrowArray> arrays = CreateArrays(schema, length);
+
+            return new RecordBatch(schema, arrays, length);
+        }
+
+        private static Field CreateField(ArrowType type)
+        {
+            return new Field(type.Name, type, nullable: false);
+        }
+
+        private static IEnumerable<IArrowArray> CreateArrays(Schema schema, int length)
+        {
+            int fieldCount = schema.Fields.Count;
+            List<IArrowArray> arrays = new List<IArrowArray>(fieldCount);
+            for (int i = 0; i < fieldCount; i++)
+            {
+                Field field = schema.GetFieldByIndex(i);
+                arrays.Add(CreateArray(field, length));
+            }
+            return arrays;
+        }
+
+        private static IArrowArray CreateArray(Field field, int length)
+        {
+            var creator = new ArrayBufferCreator(length);
+            field.DataType.Accept(creator);
+
+            ArrayData data = new ArrayData(field.DataType, length, 0, 0,
+                    new[] { ArrowBuffer.Empty, creator.Buffer });
+
+            return ArrowArrayFactory.BuildArray(data);
+        }
+
+        private class ArrayBufferCreator :
+            IArrowTypeVisitor<BooleanType>,
+            IArrowTypeVisitor<Int8Type>,
+            IArrowTypeVisitor<Int16Type>,
+            IArrowTypeVisitor<Int32Type>,
+            IArrowTypeVisitor<Int64Type>,
+            IArrowTypeVisitor<UInt8Type>,
+            IArrowTypeVisitor<UInt16Type>,
+            IArrowTypeVisitor<UInt32Type>,
+            IArrowTypeVisitor<UInt64Type>,
+            IArrowTypeVisitor<FloatType>,
+            IArrowTypeVisitor<DoubleType>
+        {
+            private readonly int _length;
+            public ArrowBuffer Buffer { get; private set; }
+
+            public ArrayBufferCreator(int length)
+            {
+                _length = length;
+            }
+
+            public void Visit(BooleanType type)
+            {
+                ArrowBuffer.Builder<bool> builder = new ArrowBuffer.Builder<bool>(_length);
+                for (int i = 0; i < _length; i++)
+                    builder.Append(i % 2 == 0);
+
+                Buffer = builder.Build();
+            }
+
+            public void Visit(Int8Type type)
+            {
+                ArrowBuffer.Builder<sbyte> builder = new ArrowBuffer.Builder<sbyte>(_length);
+                for (int i = 0; i < _length; i++)
+                    builder.Append((sbyte)i);
+
+                Buffer = builder.Build();
+            }
+
+            public void Visit(UInt8Type type)
+            {
+                ArrowBuffer.Builder<byte> builder = new ArrowBuffer.Builder<byte>(_length);
+                for (int i = 0; i < _length; i++)
+                    builder.Append((byte)i);
+
+                Buffer = builder.Build();
+            }
+
+            public void Visit(Int16Type type)
+            {
+                ArrowBuffer.Builder<short> builder = new ArrowBuffer.Builder<short>(_length);
+                for (int i = 0; i < _length; i++)
+                    builder.Append((short)i);
+
+                Buffer = builder.Build();
+            }
+
+            public void Visit(UInt16Type type)
+            {
+                ArrowBuffer.Builder<ushort> builder = new ArrowBuffer.Builder<ushort>(_length);
+                for (int i = 0; i < _length; i++)
+                    builder.Append((ushort)i);
+
+                Buffer = builder.Build();
+            }
+
+            public void Visit(Int32Type type) => CreateNumberArray<int>(type);
+            public void Visit(UInt32Type type) => CreateNumberArray<uint>(type);
+            public void Visit(Int64Type type) => CreateNumberArray<long>(type);
+            public void Visit(UInt64Type type) => CreateNumberArray<ulong>(type);
+            public void Visit(FloatType type) => CreateNumberArray<float>(type);
+            public void Visit(DoubleType type) => CreateNumberArray<double>(type);
+
+            private void CreateNumberArray<T>(IArrowType type)
+                where T : struct
+            {
+                ArrowBuffer.Builder<T> builder = new ArrowBuffer.Builder<T>(_length);
+                for (int i = 0; i < _length; i++)
+                    builder.Append((T)Convert.ChangeType(i, typeof(T)));
+
+                Buffer = builder.Build();
+            }
+
+            public void Visit(IArrowType type)
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}
diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt
index 0513927..dd6169e 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -171,6 +171,7 @@ csharp/Apache.Arrow.sln
 csharp/src/Apache.Arrow/Apache.Arrow.csproj
 csharp/src/Apache.Arrow/Properties/Resources.Designer.cs
 csharp/src/Apache.Arrow/Properties/Resources.resx
+csharp/test/Apache.Arrow.Benchmarks/Apache.Arrow.Benchmarks.csproj
 csharp/test/Apache.Arrow.Tests/Apache.Arrow.Tests.csproj
 csharp/test/Apache.Arrow.Tests/app.config
 *.html