You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/05/07 12:02:48 UTC

[arrow] branch master updated: ARROW-4503: [C#] Eliminate allocations in ArrowStreamReader when reading from a Stream

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6720c98  ARROW-4503: [C#] Eliminate allocations in ArrowStreamReader when reading from a Stream
6720c98 is described below

commit 6720c9871104bfe3da15fb0de9b78fd0aa897e0a
Author: Eric Erhardt <er...@microsoft.com>
AuthorDate: Tue May 7 12:02:31 2019 +0000

    ARROW-4503: [C#] Eliminate allocations in ArrowStreamReader when reading from a Stream
    
    When reading `RecordBatch` instances from a .NET `Stream` using the `ArrowStreamReader` class, it is currently allocating and copying memory 3 times for the data.
    
    I've changed it to take a MemoryPool in the constructor, `ArrowStreamReader` will allocate from the pool a single time for each `RecordBatch` it reads, copy from the `Stream` into that memory once, and then use it in the returned `RecordBatch`.
    
    I've also fixed the comments from #3736 that were deemed to be better fixed with this change.
    
    Author: Eric Erhardt <er...@microsoft.com>
    
    Closes #3925 from eerhardt/ArrowReaderSync and squashes the following commits:
    
    b48d41be <Eric Erhardt> Use theories in a couple of tests.
    23497e20 <Eric Erhardt> ArrowBuffer.Builder should only allocate its current Length, not its current Capacity.
    37a9b187 <Eric Erhardt> Fix up the benchmarks.
    0b0d9d48 <Eric Erhardt> PR feedback
    bd41384f <Eric Erhardt> Respond to initial feedback.
    63e33879 <Eric Erhardt> Eliminate allocations in ArrowStreamReader when reading from a Stream
---
 csharp/src/Apache.Arrow/Arrays/Array.cs            | 14 +++++
 csharp/src/Apache.Arrow/Arrays/ArrayData.cs        | 22 +++++++-
 csharp/src/Apache.Arrow/Arrays/ListArray.cs        |  9 +++
 csharp/src/Apache.Arrow/ArrowBuffer.Builder.cs     | 20 +++----
 csharp/src/Apache.Arrow/ArrowBuffer.cs             | 31 +++++++++--
 csharp/src/Apache.Arrow/Interfaces/IArrowArray.cs  |  4 +-
 csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs     | 13 ++++-
 .../Ipc/ArrowFileReaderImplementation.cs           | 17 +++---
 .../Ipc/ArrowMemoryReaderImplementation.cs         |  7 +--
 .../Apache.Arrow/Ipc/ArrowReaderImplementation.cs  | 11 ++--
 csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs   | 15 ++++-
 .../Ipc/ArrowStreamReaderImplementation.cs         | 64 ++++++++++------------
 .../Memory/{MemoryPool.cs => MemoryAllocator.cs}   | 57 ++++---------------
 ...ativeMemoryPool.cs => NativeMemoryAllocator.cs} | 17 +++---
 .../{NativeMemory.cs => NativeMemoryManager.cs}    |  0
 csharp/src/Apache.Arrow/RecordBatch.cs             | 25 ++++++++-
 .../ArrowReaderBenchmark.cs                        | 33 ++++++++++-
 .../Apache.Arrow.Tests/ArrowBufferBuilderTests.cs  | 19 +++++--
 csharp/test/Apache.Arrow.Tests/ArrowBufferTests.cs | 25 ++++++---
 .../Apache.Arrow.Tests/ArrowFileReaderTests.cs     | 35 ++++++++++++
 .../Apache.Arrow.Tests/ArrowStreamReaderTests.cs   | 35 ++++++++++++
 ...Fixture.cs => DefaultMemoryAllocatorFixture.cs} |  8 +--
 ...MemoryPoolFixture.cs => TestMemoryAllocator.cs} | 14 ++---
 23 files changed, 335 insertions(+), 160 deletions(-)

diff --git a/csharp/src/Apache.Arrow/Arrays/Array.cs b/csharp/src/Apache.Arrow/Arrays/Array.cs
index 561547a..f9bd424 100644
--- a/csharp/src/Apache.Arrow/Arrays/Array.cs
+++ b/csharp/src/Apache.Arrow/Arrays/Array.cs
@@ -59,5 +59,19 @@ namespace Apache.Arrow
                     break;
             }
         }
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected virtual void Dispose(bool disposing)
+        {
+            if (disposing)
+            {
+                Data.Dispose();
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/src/Apache.Arrow/Arrays/ArrayData.cs b/csharp/src/Apache.Arrow/Arrays/ArrayData.cs
index e2345e0..76172f3 100644
--- a/csharp/src/Apache.Arrow/Arrays/ArrayData.cs
+++ b/csharp/src/Apache.Arrow/Arrays/ArrayData.cs
@@ -14,12 +14,13 @@
 // limitations under the License.
 
 using Apache.Arrow.Types;
+using System;
 using System.Collections.Generic;
 using System.Linq;
 
 namespace Apache.Arrow
 {
-    public sealed class ArrayData
+    public sealed class ArrayData : IDisposable
     {
         public readonly IArrowType DataType;
         public readonly int Length;
@@ -53,5 +54,24 @@ namespace Apache.Arrow
             Buffers = buffers;
             Children = children;
         }
+
+        public void Dispose()
+        {
+            if (Buffers != null)
+            {
+                foreach (ArrowBuffer buffer in Buffers)
+                {
+                    buffer.Dispose();
+                }
+            }
+
+            if (Children != null)
+            {
+                foreach (ArrayData child in Children)
+                {
+                    child?.Dispose();
+                }
+            }
+        }
     }
 }
diff --git a/csharp/src/Apache.Arrow/Arrays/ListArray.cs b/csharp/src/Apache.Arrow/Arrays/ListArray.cs
index 3540f5a..6b037ce 100644
--- a/csharp/src/Apache.Arrow/Arrays/ListArray.cs
+++ b/csharp/src/Apache.Arrow/Arrays/ListArray.cs
@@ -54,5 +54,14 @@ namespace Apache.Arrow
             var offsets = ValueOffsets;
             return offsets[index + 1] - offsets[index];
         }
+
+        protected override void Dispose(bool disposing)
+        {
+            if (disposing)
+            {
+                Values?.Dispose();
+            }
+            base.Dispose(disposing);
+        }
     }
 }
diff --git a/csharp/src/Apache.Arrow/ArrowBuffer.Builder.cs b/csharp/src/Apache.Arrow/ArrowBuffer.Builder.cs
index 36c2bc2..c64b7d5 100644
--- a/csharp/src/Apache.Arrow/ArrowBuffer.Builder.cs
+++ b/csharp/src/Apache.Arrow/ArrowBuffer.Builder.cs
@@ -109,20 +109,20 @@ namespace Apache.Arrow
                 return this;
             }
 
-            public ArrowBuffer Build(MemoryPool pool = default)
+            public ArrowBuffer Build(MemoryAllocator allocator = default)
             {
-                int length;
-                checked
-                {
-                    length = (int)BitUtility.RoundUpToMultipleOf64(_buffer.Length);
-                }
+                int currentBytesLength = Length * _size;
+                int bufferLength = checked((int)BitUtility.RoundUpToMultipleOf64(currentBytesLength));
 
-                var memoryPool = pool ?? MemoryPool.Default.Value;
-                var memory = memoryPool.Allocate(length);
+                var memoryAllocator = allocator ?? MemoryAllocator.Default.Value;
+                var memoryOwner = memoryAllocator.Allocate(bufferLength);
 
-                Memory.CopyTo(memory);
+                if (memoryOwner != null)
+                {
+                    Memory.Slice(0, currentBytesLength).CopyTo(memoryOwner.Memory);
+                }
 
-                return new ArrowBuffer(memory);
+                return new ArrowBuffer(memoryOwner);
             }
 
             private Span<T> EnsureCapacity(int len)
diff --git a/csharp/src/Apache.Arrow/ArrowBuffer.cs b/csharp/src/Apache.Arrow/ArrowBuffer.cs
index aa9dc8e..f8e6759 100644
--- a/csharp/src/Apache.Arrow/ArrowBuffer.cs
+++ b/csharp/src/Apache.Arrow/ArrowBuffer.cs
@@ -14,21 +14,37 @@
 // limitations under the License.
 
 using System;
+using System.Buffers;
 using System.Runtime.CompilerServices;
 using Apache.Arrow.Memory;
 
 namespace Apache.Arrow
 {
-    public readonly partial struct ArrowBuffer: IEquatable<ArrowBuffer>
+    public readonly partial struct ArrowBuffer : IEquatable<ArrowBuffer>, IDisposable
     {
+        private readonly IMemoryOwner<byte> _memoryOwner;
+        private readonly ReadOnlyMemory<byte> _memory;
+
         public static ArrowBuffer Empty => new ArrowBuffer(Memory<byte>.Empty);
 
         public ArrowBuffer(ReadOnlyMemory<byte> data)
         {
-            Memory = data;
+            _memoryOwner = null;
+            _memory = data;
+        }
+
+        internal ArrowBuffer(IMemoryOwner<byte> memoryOwner)
+        {
+            // When wrapping an IMemoryOwner, don't cache the Memory<byte>
+            // since the owner may be disposed, and the cached Memory would
+            // be invalid.
+
+            _memoryOwner = memoryOwner;
+            _memory = Memory<byte>.Empty;
         }
 
-        public ReadOnlyMemory<byte> Memory { get; }
+        public ReadOnlyMemory<byte> Memory =>
+            _memoryOwner != null ? _memoryOwner.Memory : _memory;
 
         public bool IsEmpty => Memory.IsEmpty;
 
@@ -40,16 +56,21 @@ namespace Apache.Arrow
             get => Memory.Span;
         }
 
-        public ArrowBuffer Clone(MemoryPool pool = default)
+        public ArrowBuffer Clone(MemoryAllocator allocator = default)
         {
             return new Builder<byte>(Span.Length)
                 .Append(Span)
-                .Build(pool);
+                .Build(allocator);
         }
 
         public bool Equals(ArrowBuffer other)
         {
             return Span.SequenceEqual(other.Span);
         }
+
+        public void Dispose()
+        {
+            _memoryOwner?.Dispose();
+        }
     }
 }
diff --git a/csharp/src/Apache.Arrow/Interfaces/IArrowArray.cs b/csharp/src/Apache.Arrow/Interfaces/IArrowArray.cs
index 6a8f035..50fbc3a 100644
--- a/csharp/src/Apache.Arrow/Interfaces/IArrowArray.cs
+++ b/csharp/src/Apache.Arrow/Interfaces/IArrowArray.cs
@@ -13,9 +13,11 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+using System;
+
 namespace Apache.Arrow
 {
-    public interface IArrowArray
+    public interface IArrowArray : IDisposable
     {
         bool IsNull(int index);
 
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs b/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
index b61c2d9..e0064be 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
@@ -13,6 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+using Apache.Arrow.Memory;
 using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
@@ -34,8 +35,18 @@ namespace Apache.Arrow.Ipc
         {
         }
 
+        public ArrowFileReader(Stream stream, MemoryAllocator allocator)
+            : this(stream, allocator, leaveOpen: false)
+        {
+        }
+
         public ArrowFileReader(Stream stream, bool leaveOpen)
-            : base(new ArrowFileReaderImplementation(stream, leaveOpen))
+            : this(stream, allocator: null, leaveOpen)
+        {
+        }
+
+        public ArrowFileReader(Stream stream, MemoryAllocator allocator, bool leaveOpen)
+            : base(new ArrowFileReaderImplementation(stream, allocator, leaveOpen))
         {
         }
 
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
index 370ac6d..7877773 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
@@ -13,7 +13,9 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+using Apache.Arrow.Memory;
 using System;
+using System.Buffers;
 using System.IO;
 using System.Linq;
 using System.Threading;
@@ -38,7 +40,8 @@ namespace Apache.Arrow.Ipc
 
         private ArrowFooter _footer;
 
-        public ArrowFileReaderImplementation(Stream stream, bool leaveOpen) : base(stream, leaveOpen)
+        public ArrowFileReaderImplementation(Stream stream, MemoryAllocator allocator, bool leaveOpen)
+            : base(stream, allocator, leaveOpen)
         {
         }
 
@@ -62,7 +65,7 @@ namespace Apache.Arrow.Ipc
             await ValidateFileAsync().ConfigureAwait(false);
 
             int footerLength = 0;
-            await Buffers.RentReturnAsync(4, async (buffer) =>
+            await ArrayPool<byte>.Shared.RentReturnAsync(4, async (buffer) =>
             {
                 BaseStream.Position = GetFooterLengthPosition();
 
@@ -72,7 +75,7 @@ namespace Apache.Arrow.Ipc
                 footerLength = ReadFooterLength(buffer);
             }).ConfigureAwait(false);
 
-            await Buffers.RentReturnAsync(footerLength, async (buffer) =>
+            await ArrayPool<byte>.Shared.RentReturnAsync(footerLength, async (buffer) =>
             {
                 _footerStartPostion = (int)GetFooterLengthPosition() - footerLength;
 
@@ -95,7 +98,7 @@ namespace Apache.Arrow.Ipc
             ValidateFile();
 
             int footerLength = 0;
-            Buffers.RentReturn(4, (buffer) =>
+            ArrayPool<byte>.Shared.RentReturn(4, (buffer) =>
             {
                 BaseStream.Position = GetFooterLengthPosition();
 
@@ -105,7 +108,7 @@ namespace Apache.Arrow.Ipc
                 footerLength = ReadFooterLength(buffer);
             });
 
-            Buffers.RentReturn(footerLength, (buffer) =>
+            ArrayPool<byte>.Shared.RentReturn(footerLength, (buffer) =>
             {
                 _footerStartPostion = (int)GetFooterLengthPosition() - footerLength;
 
@@ -241,7 +244,7 @@ namespace Apache.Arrow.Ipc
 
             try
             {
-                await Buffers.RentReturnAsync(magicLength, async (buffer) =>
+                await ArrayPool<byte>.Shared.RentReturnAsync(magicLength, async (buffer) =>
                 {
                     // Seek to the beginning of the stream
                     BaseStream.Position = 0;
@@ -273,7 +276,7 @@ namespace Apache.Arrow.Ipc
 
             try
             {
-                Buffers.RentReturn(magicLength, buffer =>
+                ArrayPool<byte>.Shared.RentReturn(magicLength, buffer =>
                 {
                     // Seek to the beginning of the stream
                     BaseStream.Position = 0;
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
index df8b809..a39a87b 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
@@ -66,12 +66,7 @@ namespace Apache.Arrow.Ipc
             ByteBuffer bodybb = CreateByteBuffer(_buffer.Slice(_bufferPosition, bodyLength));
             _bufferPosition += bodyLength;
 
-            return CreateArrowObjectFromMessage(message, bodybb);
-        }
-
-        protected override ArrowBuffer CreateArrowBuffer(ReadOnlyMemory<byte> data)
-        {
-            return new ArrowBuffer(data);
+            return CreateArrowObjectFromMessage(message, bodybb, memoryOwner: null);
         }
 
         private void ReadSchema()
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
index e5e9802..9bbb7ed 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
@@ -15,10 +15,10 @@
 
 using FlatBuffers;
 using System;
+using System.Buffers;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.IO;
-using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -42,8 +42,6 @@ namespace Apache.Arrow.Ipc
         public abstract ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken);
         public abstract RecordBatch ReadNextRecordBatch();
 
-        protected abstract ArrowBuffer CreateArrowBuffer(ReadOnlyMemory<byte> data);
-
         protected static T ReadMessage<T>(ByteBuffer bb)
             where T : struct, IFlatbufferObject
         {
@@ -80,7 +78,8 @@ namespace Apache.Arrow.Ipc
             }
         }
 
-        protected RecordBatch CreateArrowObjectFromMessage(Flatbuf.Message message, ByteBuffer bodyByteBuffer)
+        protected RecordBatch CreateArrowObjectFromMessage(
+            Flatbuf.Message message, ByteBuffer bodyByteBuffer, IMemoryOwner<byte> memoryOwner)
         {
             switch (message.HeaderType)
             {
@@ -94,7 +93,7 @@ namespace Apache.Arrow.Ipc
                 case Flatbuf.MessageHeader.RecordBatch:
                     var rb = message.Header<Flatbuf.RecordBatch>().Value;
                     List<IArrowArray> arrays = BuildArrays(Schema, bodyByteBuffer, rb);
-                    return new RecordBatch(Schema, arrays, (int)rb.Length);
+                    return new RecordBatch(Schema, memoryOwner, arrays, (int)rb.Length);
                 default:
                     // NOTE: Skip unsupported message type
                     Debug.WriteLine($"Skipping unsupported message type '{message.HeaderType}'");
@@ -207,7 +206,7 @@ namespace Apache.Arrow.Ipc
             int length = (int)buffer.Length;
 
             var data = bodyData.ToReadOnlyMemory(offset, length);
-            return CreateArrowBuffer(data);
+            return new ArrowBuffer(data);
         }
     }
 }
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
index 7357a11..5e7d7be 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
@@ -13,6 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+using Apache.Arrow.Memory;
 using System;
 using System.IO;
 using System.Threading;
@@ -30,16 +31,26 @@ namespace Apache.Arrow.Ipc
         public Schema Schema => _implementation.Schema;
 
         public ArrowStreamReader(Stream stream)
-            : this(stream, leaveOpen: false)
+            : this(stream, allocator: null, leaveOpen: false)
+        {
+        }
+
+        public ArrowStreamReader(Stream stream, MemoryAllocator allocator)
+            : this(stream, allocator, leaveOpen: false)
         {
         }
 
         public ArrowStreamReader(Stream stream, bool leaveOpen)
+            : this(stream, allocator: null, leaveOpen)
+        {
+        }
+
+        public ArrowStreamReader(Stream stream, MemoryAllocator allocator, bool leaveOpen)
         {
             if (stream == null)
                 throw new ArgumentNullException(nameof(stream));
 
-            _implementation = new ArrowStreamReaderImplementation(stream, leaveOpen);
+            _implementation = new ArrowStreamReaderImplementation(stream, allocator, leaveOpen);
         }
 
         public ArrowStreamReader(ReadOnlyMemory<byte> buffer)
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
index 6ca0518..c2ac4bb 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
@@ -13,6 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+using Apache.Arrow.Memory;
 using System;
 using System.Buffers;
 using System.IO;
@@ -24,14 +25,14 @@ namespace Apache.Arrow.Ipc
     internal class ArrowStreamReaderImplementation : ArrowReaderImplementation
     {
         public Stream BaseStream { get; }
-        protected ArrayPool<byte> Buffers { get; }
         private readonly bool _leaveOpen;
+        private readonly MemoryAllocator _allocator;
 
-        public ArrowStreamReaderImplementation(Stream stream, bool leaveOpen)
+        public ArrowStreamReaderImplementation(Stream stream, MemoryAllocator allocator, bool leaveOpen)
         {
             BaseStream = stream;
+            _allocator = allocator ?? MemoryAllocator.Default.Value;
             _leaveOpen = leaveOpen;
-            Buffers = ArrayPool<byte>.Create();
         }
 
         protected override void Dispose(bool disposing)
@@ -59,7 +60,7 @@ namespace Apache.Arrow.Ipc
             await ReadSchemaAsync().ConfigureAwait(false);
 
             int messageLength = 0;
-            await Buffers.RentReturnAsync(4, async (lengthBuffer) =>
+            await ArrayPool<byte>.Shared.RentReturnAsync(4, async (lengthBuffer) =>
             {
                 // Get Length of record batch for message header.
                 int bytesRead = await BaseStream.ReadFullBufferAsync(lengthBuffer, cancellationToken)
@@ -78,7 +79,7 @@ namespace Apache.Arrow.Ipc
             }
 
             RecordBatch result = null;
-            await Buffers.RentReturnAsync(messageLength, async (messageBuff) =>
+            await ArrayPool<byte>.Shared.RentReturnAsync(messageLength, async (messageBuff) =>
             {
                 int bytesRead = await BaseStream.ReadFullBufferAsync(messageBuff, cancellationToken)
                     .ConfigureAwait(false);
@@ -86,15 +87,16 @@ namespace Apache.Arrow.Ipc
 
                 Flatbuf.Message message = Flatbuf.Message.GetRootAsMessage(CreateByteBuffer(messageBuff));
 
-                await Buffers.RentReturnAsync((int)message.BodyLength, async (bodyBuff) =>
-                {
-                    int bodyBytesRead = await BaseStream.ReadFullBufferAsync(bodyBuff, cancellationToken)
-                        .ConfigureAwait(false);
-                    EnsureFullRead(bodyBuff, bodyBytesRead);
+                int bodyLength = checked((int)message.BodyLength);
+
+                IMemoryOwner<byte> bodyBuffOwner = _allocator.Allocate(bodyLength);
+                Memory<byte> bodyBuff = bodyBuffOwner?.Memory.Slice(0, bodyLength) ?? Memory<byte>.Empty;
+                bytesRead = await BaseStream.ReadFullBufferAsync(bodyBuff, cancellationToken)
+                    .ConfigureAwait(false);
+                EnsureFullRead(bodyBuff, bytesRead);
 
-                    FlatBuffers.ByteBuffer bodybb = CreateByteBuffer(bodyBuff);
-                    result = CreateArrowObjectFromMessage(message, bodybb);
-                }).ConfigureAwait(false);
+                FlatBuffers.ByteBuffer bodybb = CreateByteBuffer(bodyBuff);
+                result = CreateArrowObjectFromMessage(message, bodybb, bodyBuffOwner);
             }).ConfigureAwait(false);
 
             return result;
@@ -105,7 +107,7 @@ namespace Apache.Arrow.Ipc
             ReadSchema();
 
             int messageLength = 0;
-            Buffers.RentReturn(4, lengthBuffer =>
+            ArrayPool<byte>.Shared.RentReturn(4, lengthBuffer =>
             {
                 int bytesRead = BaseStream.ReadFullBuffer(lengthBuffer);
 
@@ -122,21 +124,22 @@ namespace Apache.Arrow.Ipc
             }
 
             RecordBatch result = null;
-            Buffers.RentReturn(messageLength, messageBuff =>
+            ArrayPool<byte>.Shared.RentReturn(messageLength, messageBuff =>
             {
                 int bytesRead = BaseStream.ReadFullBuffer(messageBuff);
                 EnsureFullRead(messageBuff, bytesRead);
 
                 Flatbuf.Message message = Flatbuf.Message.GetRootAsMessage(CreateByteBuffer(messageBuff));
 
-                Buffers.RentReturn((int)message.BodyLength, bodyBuff =>
-                {
-                    int bodyBytesRead = BaseStream.ReadFullBuffer(bodyBuff);
-                    EnsureFullRead(bodyBuff, bodyBytesRead);
+                int bodyLength = checked((int)message.BodyLength);
 
-                    FlatBuffers.ByteBuffer bodybb = CreateByteBuffer(bodyBuff);
-                    result = CreateArrowObjectFromMessage(message, bodybb);
-                });
+                IMemoryOwner<byte> bodyBuffOwner = _allocator.Allocate(bodyLength);
+                Memory<byte> bodyBuff = bodyBuffOwner?.Memory.Slice(0, bodyLength) ?? Memory<byte>.Empty;
+                bytesRead = BaseStream.ReadFullBuffer(bodyBuff);
+                EnsureFullRead(bodyBuff, bytesRead);
+
+                FlatBuffers.ByteBuffer bodybb = CreateByteBuffer(bodyBuff);
+                result = CreateArrowObjectFromMessage(message, bodybb, bodyBuffOwner);
             });
 
             return result;
@@ -151,7 +154,7 @@ namespace Apache.Arrow.Ipc
 
             // Figure out length of schema
             int schemaMessageLength = 0;
-            await Buffers.RentReturnAsync(4, async (lengthBuffer) =>
+            await ArrayPool<byte>.Shared.RentReturnAsync(4, async (lengthBuffer) =>
             {
                 int bytesRead = await BaseStream.ReadFullBufferAsync(lengthBuffer).ConfigureAwait(false);
                 EnsureFullRead(lengthBuffer, bytesRead);
@@ -159,7 +162,7 @@ namespace Apache.Arrow.Ipc
                 schemaMessageLength = BitUtility.ReadInt32(lengthBuffer);
             }).ConfigureAwait(false);
 
-            await Buffers.RentReturnAsync(schemaMessageLength, async (buff) =>
+            await ArrayPool<byte>.Shared.RentReturnAsync(schemaMessageLength, async (buff) =>
             {
                 // Read in schema
                 int bytesRead = await BaseStream.ReadFullBufferAsync(buff).ConfigureAwait(false);
@@ -179,7 +182,7 @@ namespace Apache.Arrow.Ipc
 
             // Figure out length of schema
             int schemaMessageLength = 0;
-            Buffers.RentReturn(4, lengthBuffer =>
+            ArrayPool<byte>.Shared.RentReturn(4, lengthBuffer =>
             {
                 int bytesRead = BaseStream.ReadFullBuffer(lengthBuffer);
                 EnsureFullRead(lengthBuffer, bytesRead);
@@ -187,7 +190,7 @@ namespace Apache.Arrow.Ipc
                 schemaMessageLength = BitUtility.ReadInt32(lengthBuffer);
             });
 
-            Buffers.RentReturn(schemaMessageLength, buff =>
+            ArrayPool<byte>.Shared.RentReturn(schemaMessageLength, buff =>
             {
                 int bytesRead = BaseStream.ReadFullBuffer(buff);
                 EnsureFullRead(buff, bytesRead);
@@ -197,15 +200,6 @@ namespace Apache.Arrow.Ipc
             });
         }
 
-        protected override ArrowBuffer CreateArrowBuffer(ReadOnlyMemory<byte> data)
-        {
-            // need to use the Buffer.Builder because we are currently renting the memory to
-            // read messages
-            return new ArrowBuffer.Builder<byte>(data.Length)
-                .Append(data.Span)
-                .Build();
-        }
-
         /// <summary>
         /// Ensures the number of bytes read matches the buffer length
         /// and throws an exception it if doesn't. This ensures we have read
diff --git a/csharp/src/Apache.Arrow/Memory/MemoryPool.cs b/csharp/src/Apache.Arrow/Memory/MemoryAllocator.cs
similarity index 54%
rename from csharp/src/Apache.Arrow/Memory/MemoryPool.cs
rename to csharp/src/Apache.Arrow/Memory/MemoryAllocator.cs
index 569ca74..c793a1f 100644
--- a/csharp/src/Apache.Arrow/Memory/MemoryPool.cs
+++ b/csharp/src/Apache.Arrow/Memory/MemoryAllocator.cs
@@ -14,16 +14,16 @@
 // limitations under the License.
 
 using System;
+using System.Buffers;
 using System.Threading;
 
 namespace Apache.Arrow.Memory
 {
-
-    public abstract class MemoryPool
+    public abstract class MemoryAllocator
     {
         public const int DefaultAlignment = 64;
 
-        public static Lazy<MemoryPool> Default { get; } = new Lazy<MemoryPool>(BuildDefault, true);
+        public static Lazy<MemoryAllocator> Default { get; } = new Lazy<MemoryAllocator>(BuildDefault, true);
 
         public class Stats
         {
@@ -44,13 +44,13 @@ namespace Apache.Arrow.Memory
 
         protected int Alignment { get; }
 
-        protected MemoryPool(int alignment = DefaultAlignment)
+        protected MemoryAllocator(int alignment = DefaultAlignment)
         {
             Statistics = new Stats();
             Alignment = alignment;
         }
 
-        public Memory<byte> Allocate(int length)
+        public IMemoryOwner<byte> Allocate(int length)
         {
             if (length < 0)
             {
@@ -59,56 +59,21 @@ namespace Apache.Arrow.Memory
 
             if (length == 0)
             {
-                return Memory<byte>.Empty;
+                return null;
             }
 
-            var memory = AllocateInternal(length, out var bytesAllocated);
+            var memoryOwner = AllocateInternal(length, out var bytesAllocated);
 
             Statistics.Allocate(bytesAllocated);
 
-            // Ensure all allocated memory is zeroed.
-
-            ZeroMemory(memory.Span);
-            
-            return memory;
-        }
-
-        public Memory<byte> Reallocate(Memory<byte> memory, int length)
-        {
-            if (length < 0)
-            {
-                throw new ArgumentOutOfRangeException(nameof(length));
-            }
-
-            if (length == 0)
-            {
-                return Memory<byte>.Empty;
-            }
-
-            var buffer = ReallocateInternal(memory, length, out var bytesAllocated);
-
-            Statistics.Allocate(bytesAllocated);
-
-            if (length > memory.Length)
-            {
-                ZeroMemory(buffer.Span.Slice(
-                    memory.Length, length - memory.Length));
-            }
-
-            return buffer;
-        }
-
-        private static void ZeroMemory(Span<byte> span)
-        {
-            span.Fill(0);
+            return memoryOwner;
         }
 
-        private static MemoryPool BuildDefault()
+        private static MemoryAllocator BuildDefault()
         {
-            return new NativeMemoryPool(DefaultAlignment);
+            return new NativeMemoryAllocator(DefaultAlignment);
         }
 
-        protected abstract Memory<byte> AllocateInternal(int length, out int bytesAllocated);
-        protected abstract Memory<byte> ReallocateInternal(Memory<byte> memory, int length, out int bytesAllocated);
+        protected abstract IMemoryOwner<byte> AllocateInternal(int length, out int bytesAllocated);
     }
 }
diff --git a/csharp/src/Apache.Arrow/Memory/NativeMemoryPool.cs b/csharp/src/Apache.Arrow/Memory/NativeMemoryAllocator.cs
similarity index 75%
rename from csharp/src/Apache.Arrow/Memory/NativeMemoryPool.cs
rename to csharp/src/Apache.Arrow/Memory/NativeMemoryAllocator.cs
index 2ea07ce..bfbb223 100644
--- a/csharp/src/Apache.Arrow/Memory/NativeMemoryPool.cs
+++ b/csharp/src/Apache.Arrow/Memory/NativeMemoryAllocator.cs
@@ -14,16 +14,17 @@
 // limitations under the License.
 
 using System;
+using System.Buffers;
 using System.Runtime.InteropServices;
 
 namespace Apache.Arrow.Memory
 {
-    public class NativeMemoryPool : MemoryPool
+    public class NativeMemoryAllocator : MemoryAllocator
     {
-        public NativeMemoryPool(int alignment = DefaultAlignment) 
+        public NativeMemoryAllocator(int alignment = DefaultAlignment) 
             : base(alignment) { }
 
-        protected override Memory<byte> AllocateInternal(int length, out int bytesAllocated)
+        protected override IMemoryOwner<byte> AllocateInternal(int length, out int bytesAllocated)
         {
             // TODO: Ensure memory is released if exception occurs.
 
@@ -41,14 +42,10 @@ namespace Apache.Arrow.Memory
 
             GC.AddMemoryPressure(bytesAllocated);
 
-            return manager.Memory;
-        }
+            // Ensure all allocated memory is zeroed.
+            manager.Memory.Span.Fill(0);
 
-        protected override Memory<byte> ReallocateInternal(Memory<byte> memory, int length, out int bytesAllocated)
-        {
-            var buffer = AllocateInternal(length, out bytesAllocated);
-            memory.CopyTo(buffer);
-            return buffer;
+            return manager;
         }
     }
 }
diff --git a/csharp/src/Apache.Arrow/Memory/NativeMemory.cs b/csharp/src/Apache.Arrow/Memory/NativeMemoryManager.cs
similarity index 100%
rename from csharp/src/Apache.Arrow/Memory/NativeMemory.cs
rename to csharp/src/Apache.Arrow/Memory/NativeMemoryManager.cs
diff --git a/csharp/src/Apache.Arrow/RecordBatch.cs b/csharp/src/Apache.Arrow/RecordBatch.cs
index 297862f..822cbac 100644
--- a/csharp/src/Apache.Arrow/RecordBatch.cs
+++ b/csharp/src/Apache.Arrow/RecordBatch.cs
@@ -14,19 +14,21 @@
 // limitations under the License.
 
 using System;
+using System.Buffers;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Linq;
 
 namespace Apache.Arrow
 {
-    public class RecordBatch
+    public class RecordBatch : IDisposable
     {
         public Schema Schema { get; }
         public int ColumnCount => _arrays.Count;
         public IEnumerable<IArrowArray> Arrays => _arrays;
         public int Length { get; }
 
+        private readonly IMemoryOwner<byte> _memoryOwner;
         private readonly IList<IArrowArray> _arrays;
 
         public IArrowArray Column(int i)
@@ -40,6 +42,24 @@ namespace Apache.Arrow
             return _arrays[fieldIndex];
         }
 
+        public void Dispose()
+        {
+            Dispose(disposing: true);
+        }
+
+        protected virtual void Dispose(bool disposing)
+        {
+            if (disposing)
+            {
+                _memoryOwner?.Dispose();
+
+                foreach (IArrowArray array in _arrays)
+                {
+                    array.Dispose();
+                }
+            }
+        }
+
         public RecordBatch(Schema schema, IEnumerable<IArrowArray> data, int length)
         {
             if (length < 0)
@@ -53,12 +73,13 @@ namespace Apache.Arrow
             Length = length;
         }
 
-        internal RecordBatch(Schema schema, List<IArrowArray> arrays, int length)
+        internal RecordBatch(Schema schema, IMemoryOwner<byte> memoryOwner, List<IArrowArray> arrays, int length)
         {
             Debug.Assert(schema != null);
             Debug.Assert(arrays != null);
             Debug.Assert(length >= 0);
 
+            _memoryOwner = memoryOwner;
             _arrays = arrays;
             Schema = schema;
             Length = length;
diff --git a/csharp/test/Apache.Arrow.Benchmarks/ArrowReaderBenchmark.cs b/csharp/test/Apache.Arrow.Benchmarks/ArrowReaderBenchmark.cs
index 1e3aecf..9c1bd2f 100644
--- a/csharp/test/Apache.Arrow.Benchmarks/ArrowReaderBenchmark.cs
+++ b/csharp/test/Apache.Arrow.Benchmarks/ArrowReaderBenchmark.cs
@@ -14,6 +14,7 @@
 // limitations under the License.
 
 using Apache.Arrow.Ipc;
+using Apache.Arrow.Memory;
 using Apache.Arrow.Tests;
 using Apache.Arrow.Types;
 using BenchmarkDotNet.Attributes;
@@ -28,12 +29,16 @@ namespace Apache.Arrow.Benchmarks
     [MemoryDiagnoser]
     public class ArrowReaderBenchmark
     {
+        [Params(10_000, 1_000_000)]
+        public int Count { get; set; }
+
         private MemoryStream _memoryStream;
+        private static readonly MemoryAllocator s_allocator = new TestMemoryAllocator();
 
         [GlobalSetup]
         public async Task GlobalSetup()
         {
-            RecordBatch batch = TestData.CreateSampleRecordBatch(length: 1_000_000);
+            RecordBatch batch = TestData.CreateSampleRecordBatch(length: Count);
             _memoryStream = new MemoryStream();
 
             ArrowStreamWriter writer = new ArrowStreamWriter(_memoryStream, batch.Schema);
@@ -54,7 +59,26 @@ namespace Apache.Arrow.Benchmarks
             RecordBatch recordBatch;
             while ((recordBatch = await reader.ReadNextRecordBatchAsync()) != null)
             {
-                sum += SumAllNumbers(recordBatch);
+                using (recordBatch)
+                {
+                    sum += SumAllNumbers(recordBatch);
+                }
+            }
+            return sum;
+        }
+
+        [Benchmark]
+        public async Task<double> ArrowReaderWithMemoryStream_ManagedMemory()
+        {
+            double sum = 0;
+            var reader = new ArrowStreamReader(_memoryStream, s_allocator);
+            RecordBatch recordBatch;
+            while ((recordBatch = await reader.ReadNextRecordBatchAsync()) != null)
+            {
+                using (recordBatch)
+                {
+                    sum += SumAllNumbers(recordBatch);
+                }
             }
             return sum;
         }
@@ -67,7 +91,10 @@ namespace Apache.Arrow.Benchmarks
             RecordBatch recordBatch;
             while ((recordBatch = await reader.ReadNextRecordBatchAsync()) != null)
             {
-                sum += SumAllNumbers(recordBatch);
+                using (recordBatch)
+                {
+                    sum += SumAllNumbers(recordBatch);
+                }
             }
             return sum;
         }
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowBufferBuilderTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowBufferBuilderTests.cs
index eee4d14..144baa9 100644
--- a/csharp/test/Apache.Arrow.Tests/ArrowBufferBuilderTests.cs
+++ b/csharp/test/Apache.Arrow.Tests/ArrowBufferBuilderTests.cs
@@ -155,22 +155,29 @@ namespace Apache.Arrow.Tests
 
         public class Clear
         {
-            [Fact]
-            public void SetsAllValuesToDefault()
+            [Theory]
+            [InlineData(10)]
+            [InlineData(100)]
+            public void SetsAllValuesToDefault(int sizeBeforeClear)
             {
                 var builder = new ArrowBuffer.Builder<int>(1);
-                var data = Enumerable.Range(0, 10).Select(x => x).ToArray();
+                var data = Enumerable.Range(0, sizeBeforeClear).Select(x => x).ToArray();
 
                 builder.AppendRange(data);
                 builder.Clear();
+                builder.Append(0);
 
                 var buffer = builder.Build();
-                var zeros = Enumerable.Range(0, 10).Select(x => 0).ToArray();
-                var values = buffer.Span.CastTo<int>().Slice(0, 10).ToArray();
+                // No matter the sizeBeforeClear, we only appended a single 0,
+                // so the buffer length should be the smallest possible.
+                Assert.Equal(64, buffer.Length);
+
+                // check all 16 int elements are default
+                var zeros = Enumerable.Range(0, 16).Select(x => 0).ToArray();
+                var values = buffer.Span.CastTo<int>().Slice(0, 16).ToArray();
 
                 Assert.True(zeros.SequenceEqual(values));
             }
         }
-
     }
 }
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowBufferTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowBufferTests.cs
index dccb22e..e6fa525 100644
--- a/csharp/test/Apache.Arrow.Tests/ArrowBufferTests.cs
+++ b/csharp/test/Apache.Arrow.Tests/ArrowBufferTests.cs
@@ -23,11 +23,11 @@ namespace Apache.Arrow.Tests
     public class ArrowBufferTests
     {
         public class Allocate : 
-            IClassFixture<DefaultMemoryPoolFixture>
+            IClassFixture<DefaultMemoryAllocatorFixture>
         {
-            private readonly DefaultMemoryPoolFixture _memoryPoolFixture;
+            private readonly DefaultMemoryAllocatorFixture _memoryPoolFixture;
 
-            public Allocate(DefaultMemoryPoolFixture memoryPoolFixture)
+            public Allocate(DefaultMemoryAllocatorFixture memoryPoolFixture)
             {
                 _memoryPoolFixture = memoryPoolFixture;
             }
@@ -38,15 +38,21 @@ namespace Apache.Arrow.Tests
             /// <param name="size">number of bytes to allocate</param>
             /// <param name="expectedCapacity">expected buffer capacity after allocation</param>
             [Theory]
+            [InlineData(0, 0)]
             [InlineData(1, 64)]
             [InlineData(8, 64)]
             [InlineData(9, 64)]
             [InlineData(65, 128)]
             public void AllocatesWithExpectedPadding(int size, int expectedCapacity)
             {
-                var buffer = new ArrowBuffer.Builder<byte>(size).Build();
+                var builder = new ArrowBuffer.Builder<byte>(size);
+                for (int i = 0; i < size; i++)
+                {
+                    builder.Append(0);
+                }
+                var buffer = builder.Build();
 
-                Assert.Equal(buffer.Length, expectedCapacity);
+                Assert.Equal(expectedCapacity, buffer.Length);
             }
 
             /// <summary>
@@ -59,7 +65,12 @@ namespace Apache.Arrow.Tests
             [InlineData(128)]
             public unsafe void AllocatesAlignedToMultipleOf64(int size)
             {
-                var buffer = new ArrowBuffer.Builder<byte>(size).Build();
+                var builder = new ArrowBuffer.Builder<byte>(size);
+                for (int i = 0; i < size; i++)
+                {
+                    builder.Append(0);
+                }
+                var buffer = builder.Build();
 
                 fixed (byte* ptr = &buffer.Span.GetPinnableReference())
                 { 
@@ -73,7 +84,7 @@ namespace Apache.Arrow.Tests
             [Fact]
             public void HasZeroPadding()
             {
-                var buffer = new ArrowBuffer.Builder<byte>(10).Build();
+                var buffer = new ArrowBuffer.Builder<byte>(10).Append(0).Build();
                 
                 foreach (var b in buffer.Span)
                 {
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs
index b756faa..8051bf4 100644
--- a/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs
+++ b/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs
@@ -14,6 +14,7 @@
 // limitations under the License.
 
 using Apache.Arrow.Ipc;
+using Apache.Arrow.Memory;
 using System;
 using System.IO;
 using System.Threading.Tasks;
@@ -47,6 +48,40 @@ namespace Apache.Arrow.Tests
             Assert.Equal(0, stream.Position);
         }
 
+        [Theory]
+        [InlineData(true)]
+        [InlineData(false)]
+        public async Task Ctor_MemoryPool_AllocatesFromPool(bool shouldLeaveOpen)
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+
+            using (MemoryStream stream = new MemoryStream())
+            {
+                ArrowFileWriter writer = new ArrowFileWriter(stream, originalBatch.Schema);
+                await writer.WriteRecordBatchAsync(originalBatch);
+                await writer.WriteFooterAsync();
+                stream.Position = 0;
+
+                var memoryPool = new TestMemoryAllocator();
+                ArrowFileReader reader = new ArrowFileReader(stream, memoryPool, leaveOpen: shouldLeaveOpen);
+                reader.ReadNextRecordBatch();
+
+                Assert.Equal(1, memoryPool.Statistics.Allocations);
+                Assert.True(memoryPool.Statistics.BytesAllocated > 0);
+
+                reader.Dispose();
+
+                if (shouldLeaveOpen)
+                {
+                    Assert.True(stream.Position > 0);
+                }
+                else
+                {
+                    Assert.Throws<ObjectDisposedException>(() => stream.Position);
+                }
+            }
+        }
+
         [Fact]
         public async Task TestReadNextRecordBatch()
         {
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
index 0a2670c..914d91e 100644
--- a/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
+++ b/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
@@ -14,6 +14,7 @@
 // limitations under the License.
 
 using Apache.Arrow.Ipc;
+using Apache.Arrow.Memory;
 using System;
 using System.IO;
 using System.Threading;
@@ -48,6 +49,40 @@ namespace Apache.Arrow.Tests
             Assert.Equal(0, stream.Position);
         }
 
+        [Theory]
+        [InlineData(true)]
+        [InlineData(false)]
+        public async Task Ctor_MemoryPool_AllocatesFromPool(bool shouldLeaveOpen)
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 100);
+
+            using (MemoryStream stream = new MemoryStream())
+            {
+                ArrowStreamWriter writer = new ArrowStreamWriter(stream, originalBatch.Schema);
+                await writer.WriteRecordBatchAsync(originalBatch);
+
+                stream.Position = 0;
+
+                var memoryPool = new TestMemoryAllocator();
+                ArrowStreamReader reader = new ArrowStreamReader(stream, memoryPool, shouldLeaveOpen);
+                reader.ReadNextRecordBatch();
+
+                Assert.Equal(1, memoryPool.Statistics.Allocations);
+                Assert.True(memoryPool.Statistics.BytesAllocated > 0);
+
+                reader.Dispose();
+
+                if (shouldLeaveOpen)
+                {
+                    Assert.True(stream.Position > 0);
+                }
+                else
+                {
+                    Assert.Throws<ObjectDisposedException>(() => stream.Position);
+                }
+            }
+        }
+
         [Fact]
         public async Task ReadRecordBatch_Memory()
         {
diff --git a/csharp/test/Apache.Arrow.Tests/Fixtures/DefaultMemoryPoolFixture.cs b/csharp/test/Apache.Arrow.Tests/Fixtures/DefaultMemoryAllocatorFixture.cs
similarity index 81%
copy from csharp/test/Apache.Arrow.Tests/Fixtures/DefaultMemoryPoolFixture.cs
copy to csharp/test/Apache.Arrow.Tests/Fixtures/DefaultMemoryAllocatorFixture.cs
index 3b867cd..276caf1 100644
--- a/csharp/test/Apache.Arrow.Tests/Fixtures/DefaultMemoryPoolFixture.cs
+++ b/csharp/test/Apache.Arrow.Tests/Fixtures/DefaultMemoryAllocatorFixture.cs
@@ -17,15 +17,15 @@ using Apache.Arrow.Memory;
 
 namespace Apache.Arrow.Tests.Fixtures
 {
-    public class DefaultMemoryPoolFixture
+    public class DefaultMemoryAllocatorFixture
     {
-        public MemoryPool MemoryPool { get; }
+        public MemoryAllocator MemoryAllocator { get; }
 
-        public DefaultMemoryPoolFixture()
+        public DefaultMemoryAllocatorFixture()
         {
             const int alignment = 64;
 
-            MemoryPool = new NativeMemoryPool(alignment);
+            MemoryAllocator = new NativeMemoryAllocator(alignment);
         }
     }
 }
diff --git a/csharp/test/Apache.Arrow.Tests/Fixtures/DefaultMemoryPoolFixture.cs b/csharp/test/Apache.Arrow.Tests/TestMemoryAllocator.cs
similarity index 74%
rename from csharp/test/Apache.Arrow.Tests/Fixtures/DefaultMemoryPoolFixture.cs
rename to csharp/test/Apache.Arrow.Tests/TestMemoryAllocator.cs
index 3b867cd..e0e36af 100644
--- a/csharp/test/Apache.Arrow.Tests/Fixtures/DefaultMemoryPoolFixture.cs
+++ b/csharp/test/Apache.Arrow.Tests/TestMemoryAllocator.cs
@@ -14,18 +14,16 @@
 // limitations under the License.
 
 using Apache.Arrow.Memory;
+using System.Buffers;
 
-namespace Apache.Arrow.Tests.Fixtures
+namespace Apache.Arrow.Tests
 {
-    public class DefaultMemoryPoolFixture
+    public class TestMemoryAllocator : MemoryAllocator
     {
-        public MemoryPool MemoryPool { get; }
-
-        public DefaultMemoryPoolFixture()
+        protected override IMemoryOwner<byte> AllocateInternal(int length, out int bytesAllocated)
         {
-            const int alignment = 64;
-
-            MemoryPool = new NativeMemoryPool(alignment);
+            bytesAllocated = length;
+            return MemoryPool<byte>.Shared.Rent(length);
         }
     }
 }