You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by wj...@apache.org on 2023/01/26 19:30:28 UTC

[arrow] branch master updated: GH-32240: [C#] Support decompression of IPC format buffers (#33603)

This is an automated email from the ASF dual-hosted git repository.

wjones127 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new b7fd793975 GH-32240: [C#] Support decompression of IPC format buffers (#33603)
b7fd793975 is described below

commit b7fd7939759084c9714342f25259142bb18ddf36
Author: Adam Reeve <ad...@gmail.com>
AuthorDate: Fri Jan 27 08:30:15 2023 +1300

    GH-32240: [C#] Support decompression of IPC format buffers (#33603)
    
    # Which issue does this PR close?
    
    Closes https://github.com/apache/arrow/issues/32240 / https://issues.apache.org/jira/browse/ARROW-16921
    
    # What changes are included in this PR?
    
    This PR implements decompression support for Arrow IPC format files and streams in the dotnet/C# library.
    
    The main concern raised in the above Jira issue was that we don't want to add new NuGet package dependencies to support decompression formats that won't be needed by most users, so a default `CompressionProvider` implementation has been added that uses reflection to use the `ZstdNet` package for ZSTD decompression and `K4os.Compression.LZ4.Streams` and `CommunityToolkit.HighPerformance` for LZ4 Frame support if they are available. The `netstandard1.3` target has decompression support  [...]
    
    The `ArrowFileReader` and `ArrowStreamReader` constructors accept an `ICompressionProvider` parameter to allow users to provide their own compression provider if they want to use different dependencies.
    
    ### Alternatives to consider
    
    An alternative approach that could be considered instead of reflection is to use these extra dependencies as build time dependencies but not make them dependencies of the NuGet package. I tested this out in https://github.com/adamreeve/arrow/commit/4544afde6fef12337c7b188cc497da0bc1bf829d and it seems to work reasonably well too but required bumping the version of `System.Runtime.CompilerServices.Unsafe` under the `netstandard2.0` and `netcoreapp3.1` targets. This reduces all the refl [...]
    
    Another alternative would be to move decompression support into a separate NuGet package (eg.  `Apache.Arrow.Compression`) that depends on `Apache.Arrow` and has an implementation of `ICompressionProvider` that users can pass in to the `ArrowFileReader` constructor, or maybe has a way to register itself with the `Apache.Arrow` package so it only needs to be configured once. That would seem cleaner to me but I'm not sure how much work it would be to set up a whole new package.
    
    # Are these changes tested?
    
    Yes, new unit tests have been added. Test files have been created with a Python script that is included in the PR due to only decompression support being added and not compression support.
    
    # Are there any user-facing changes?
    
    Yes, this implements a new feature but in a backwards compatible way.
    * Closes: #32240
    
    Authored-by: Adam Reeve <ad...@gmail.com>
    Signed-off-by: Will Jones <wi...@gmail.com>
---
 csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs     |  19 +++++-
 .../Ipc/ArrowFileReaderImplementation.cs           |   4 +-
 .../Apache.Arrow/Ipc/ArrowReaderImplementation.cs  |  72 +++++++++++++++------
 csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs   |  23 +++++--
 .../Ipc/ArrowStreamReaderImplementation.cs         |   2 +-
 .../src/Apache.Arrow/Ipc/CompressionCodecType.cs   |  23 +++++++
 .../Apache.Arrow/Ipc/DecompressingBufferCreator.cs |  69 ++++++++++++++++++++
 csharp/src/Apache.Arrow/Ipc/IBufferCreator.cs      |  27 ++++++++
 csharp/src/Apache.Arrow/Ipc/ICompressionCodec.cs   |  33 ++++++++++
 .../Apache.Arrow/Ipc/ICompressionCodecFactory.cs   |  25 +++++++
 csharp/src/Apache.Arrow/Ipc/NoOpBufferCreator.cs   |  34 ++++++++++
 .../Apache.Arrow.Tests/Apache.Arrow.Tests.csproj   |  14 ++++
 .../Apache.Arrow.Tests/ArrowFileReaderTests.cs     |  40 +++++++++++-
 .../Apache.Arrow.Tests/ArrowStreamReaderTests.cs   |  27 ++++++++
 .../Compression/CompressionCodecFactory.cs         |  33 ++++++++++
 .../Compression/Lz4CompressionCodec.cs             |  38 +++++++++++
 .../Compression/ZstdCompressionCodec.cs            |  41 ++++++++++++
 .../Resources/ipc_lz4_compression.arrow            | Bin 0 -> 1506 bytes
 .../Resources/ipc_lz4_compression.arrow_stream     | Bin 0 -> 1256 bytes
 .../Resources/ipc_zstd_compression.arrow           | Bin 0 -> 1170 bytes
 .../Resources/ipc_zstd_compression.arrow_stream    | Bin 0 -> 920 bytes
 .../test/Apache.Arrow.Tests/generate_resources.py  |  52 +++++++++++++++
 22 files changed, 546 insertions(+), 30 deletions(-)

diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs b/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
index e0064be376..935a1e7fb7 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowFileReader.cs
@@ -35,18 +35,33 @@ namespace Apache.Arrow.Ipc
         {
         }
 
+        public ArrowFileReader(Stream stream, ICompressionCodecFactory compressionCodecFactory)
+            : this(stream, allocator: null, compressionCodecFactory, leaveOpen: false)
+        {
+        }
+
         public ArrowFileReader(Stream stream, MemoryAllocator allocator)
             : this(stream, allocator, leaveOpen: false)
         {
         }
 
+        public ArrowFileReader(Stream stream, MemoryAllocator allocator, ICompressionCodecFactory compressionCodecFactory)
+            : this(stream, allocator, compressionCodecFactory, leaveOpen: false)
+        {
+        }
+
         public ArrowFileReader(Stream stream, bool leaveOpen)
-            : this(stream, allocator: null, leaveOpen)
+            : this(stream, allocator: null, compressionCodecFactory: null, leaveOpen)
         {
         }
 
         public ArrowFileReader(Stream stream, MemoryAllocator allocator, bool leaveOpen)
-            : base(new ArrowFileReaderImplementation(stream, allocator, leaveOpen))
+            : this(stream, allocator, compressionCodecFactory: null, leaveOpen)
+        {
+        }
+
+        public ArrowFileReader(Stream stream, MemoryAllocator allocator, ICompressionCodecFactory compressionCodecFactory, bool leaveOpen)
+            : base(new ArrowFileReaderImplementation(stream, allocator, compressionCodecFactory, leaveOpen))
         {
         }
 
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
index 36cd4ddf93..d88665e496 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
@@ -35,8 +35,8 @@ namespace Apache.Arrow.Ipc
 
         private ArrowFooter _footer;
 
-        public ArrowFileReaderImplementation(Stream stream, MemoryAllocator allocator, bool leaveOpen)
-            : base(stream, allocator, leaveOpen)
+        public ArrowFileReaderImplementation(Stream stream, MemoryAllocator allocator, ICompressionCodecFactory compressionCodecFactory, bool leaveOpen)
+            : base(stream, allocator, compressionCodecFactory, leaveOpen)
         {
         }
 
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
index 35199477b8..4f87d88f61 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
@@ -21,8 +21,10 @@ using System.Diagnostics;
 using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
+using Apache.Arrow.Flatbuf;
 using Apache.Arrow.Types;
 using Apache.Arrow.Memory;
+using Type = System.Type;
 
 namespace Apache.Arrow.Ipc
 {
@@ -34,13 +36,15 @@ namespace Apache.Arrow.Ipc
         private protected DictionaryMemo _dictionaryMemo;
         private protected DictionaryMemo DictionaryMemo => _dictionaryMemo ??= new DictionaryMemo();
         private protected readonly MemoryAllocator _allocator;
+        private readonly ICompressionCodecFactory _compressionCodecFactory;
 
-        private protected ArrowReaderImplementation() : this(null)
+        private protected ArrowReaderImplementation() : this(null, null)
         { }
 
-        private protected ArrowReaderImplementation(MemoryAllocator allocator)
+        private protected ArrowReaderImplementation(MemoryAllocator allocator, ICompressionCodecFactory compressionCodecFactory)
         {
             _allocator = allocator ?? MemoryAllocator.Default.Value;
+            _compressionCodecFactory = compressionCodecFactory;
         }
 
         public void Dispose()
@@ -174,6 +178,7 @@ namespace Apache.Arrow.Ipc
                 return arrays;
             }
 
+            using var bufferCreator = GetBufferCreator(recordBatchMessage.Compression);
             var recordBatchEnumerator = new RecordBatchEnumerator(in recordBatchMessage);
             int schemaFieldIndex = 0;
             do
@@ -182,8 +187,8 @@ namespace Apache.Arrow.Ipc
                 Flatbuf.FieldNode fieldNode = recordBatchEnumerator.CurrentNode;
 
                 ArrayData arrayData = field.DataType.IsFixedPrimitive()
-                    ? LoadPrimitiveField(ref recordBatchEnumerator, field, in fieldNode, messageBuffer)
-                    : LoadVariableField(ref recordBatchEnumerator, field, in fieldNode, messageBuffer);
+                    ? LoadPrimitiveField(ref recordBatchEnumerator, field, in fieldNode, messageBuffer, bufferCreator)
+                    : LoadVariableField(ref recordBatchEnumerator, field, in fieldNode, messageBuffer, bufferCreator);
 
                 arrays.Add(ArrowArrayFactory.BuildArray(arrayData));
             } while (recordBatchEnumerator.MoveNextNode());
@@ -191,14 +196,43 @@ namespace Apache.Arrow.Ipc
             return arrays;
         }
 
+        private IBufferCreator GetBufferCreator(BodyCompression? compression)
+        {
+            if (!compression.HasValue)
+            {
+                return new NoOpBufferCreator();
+            }
+
+            var method = compression.Value.Method;
+            if (method != BodyCompressionMethod.BUFFER)
+            {
+                throw new NotImplementedException($"Compression method {method} is not supported");
+            }
+
+            var codec = compression.Value.Codec;
+            if (_compressionCodecFactory == null)
+            {
+                throw new Exception(
+                    $"Body is compressed with codec {codec} but no {nameof(ICompressionCodecFactory)} has been configured to decompress buffers");
+            }
+            var decompressor = codec switch
+            {
+                Apache.Arrow.Flatbuf.CompressionType.LZ4_FRAME => _compressionCodecFactory.CreateCodec(CompressionCodecType.Lz4Frame),
+                Apache.Arrow.Flatbuf.CompressionType.ZSTD => _compressionCodecFactory.CreateCodec(CompressionCodecType.Zstd),
+                _ => throw new NotImplementedException($"Compression codec {codec} is not supported")
+            };
+            return new DecompressingBufferCreator(decompressor, _allocator);
+        }
+
         private ArrayData LoadPrimitiveField(
             ref RecordBatchEnumerator recordBatchEnumerator,
             Field field,
             in Flatbuf.FieldNode fieldNode,
-            ByteBuffer bodyData)
+            ByteBuffer bodyData,
+            IBufferCreator bufferCreator)
         {
 
-            ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
+            ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer, bufferCreator);
             if (!recordBatchEnumerator.MoveNextBuffer())
             {
                 throw new Exception("Unable to move to the next buffer.");
@@ -224,13 +258,13 @@ namespace Apache.Arrow.Ipc
             }
             else
             {
-                ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
+                ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer, bufferCreator);
                 recordBatchEnumerator.MoveNextBuffer();
 
                 arrowBuff = new[] { nullArrowBuffer, valueArrowBuffer };
             }
 
-            ArrayData[] children = GetChildren(ref recordBatchEnumerator, field, bodyData);
+            ArrayData[] children = GetChildren(ref recordBatchEnumerator, field, bodyData, bufferCreator);
 
             IArrowArray dictionary = null;
             if (field.DataType.TypeId == ArrowTypeId.Dictionary)
@@ -246,20 +280,21 @@ namespace Apache.Arrow.Ipc
             ref RecordBatchEnumerator recordBatchEnumerator,
             Field field,
             in Flatbuf.FieldNode fieldNode,
-            ByteBuffer bodyData)
+            ByteBuffer bodyData,
+            IBufferCreator bufferCreator)
         {
 
-            ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
+            ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer, bufferCreator);
             if (!recordBatchEnumerator.MoveNextBuffer())
             {
                 throw new Exception("Unable to move to the next buffer.");
             }
-            ArrowBuffer offsetArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
+            ArrowBuffer offsetArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer, bufferCreator);
             if (!recordBatchEnumerator.MoveNextBuffer())
             {
                 throw new Exception("Unable to move to the next buffer.");
             }
-            ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
+            ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer, bufferCreator);
             recordBatchEnumerator.MoveNextBuffer();
 
             int fieldLength = (int)fieldNode.Length;
@@ -276,7 +311,7 @@ namespace Apache.Arrow.Ipc
             }
 
             ArrowBuffer[] arrowBuff = new[] { nullArrowBuffer, offsetArrowBuffer, valueArrowBuffer };
-            ArrayData[] children = GetChildren(ref recordBatchEnumerator, field, bodyData);
+            ArrayData[] children = GetChildren(ref recordBatchEnumerator, field, bodyData, bufferCreator);
 
             IArrowArray dictionary = null;
             if (field.DataType.TypeId == ArrowTypeId.Dictionary)
@@ -291,7 +326,8 @@ namespace Apache.Arrow.Ipc
         private ArrayData[] GetChildren(
             ref RecordBatchEnumerator recordBatchEnumerator,
             Field field,
-            ByteBuffer bodyData)
+            ByteBuffer bodyData,
+            IBufferCreator bufferCreator)
         {
             if (!(field.DataType is NestedType type)) return null;
 
@@ -304,15 +340,15 @@ namespace Apache.Arrow.Ipc
 
                 Field childField = type.Fields[index];
                 ArrayData child = childField.DataType.IsFixedPrimitive()
-                    ? LoadPrimitiveField(ref recordBatchEnumerator, childField, in childFieldNode, bodyData)
-                    : LoadVariableField(ref recordBatchEnumerator, childField, in childFieldNode, bodyData);
+                    ? LoadPrimitiveField(ref recordBatchEnumerator, childField, in childFieldNode, bodyData, bufferCreator)
+                    : LoadVariableField(ref recordBatchEnumerator, childField, in childFieldNode, bodyData, bufferCreator);
 
                 children[index] = child;
             }
             return children;
         }
 
-        private ArrowBuffer BuildArrowBuffer(ByteBuffer bodyData, Flatbuf.Buffer buffer)
+        private ArrowBuffer BuildArrowBuffer(ByteBuffer bodyData, Flatbuf.Buffer buffer, IBufferCreator bufferCreator)
         {
             if (buffer.Length <= 0)
             {
@@ -323,7 +359,7 @@ namespace Apache.Arrow.Ipc
             int length = (int)buffer.Length;
 
             var data = bodyData.ToReadOnlyMemory(offset, length);
-            return new ArrowBuffer(data);
+            return bufferCreator.CreateBuffer(data);
         }
     }
 
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
index 5e7d7befb7..79dc77f25d 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReader.cs
@@ -31,26 +31,41 @@ namespace Apache.Arrow.Ipc
         public Schema Schema => _implementation.Schema;
 
         public ArrowStreamReader(Stream stream)
-            : this(stream, allocator: null, leaveOpen: false)
+            : this(stream, allocator: null, compressionCodecFactory: null, leaveOpen: false)
         {
         }
 
         public ArrowStreamReader(Stream stream, MemoryAllocator allocator)
-            : this(stream, allocator, leaveOpen: false)
+            : this(stream, allocator, compressionCodecFactory: null, leaveOpen: false)
+        {
+        }
+
+        public ArrowStreamReader(Stream stream, ICompressionCodecFactory compressionCodecFactory)
+            : this(stream, allocator: null, compressionCodecFactory, leaveOpen: false)
         {
         }
 
         public ArrowStreamReader(Stream stream, bool leaveOpen)
-            : this(stream, allocator: null, leaveOpen)
+            : this(stream, allocator: null, compressionCodecFactory: null, leaveOpen)
         {
         }
 
         public ArrowStreamReader(Stream stream, MemoryAllocator allocator, bool leaveOpen)
+            : this(stream, allocator, compressionCodecFactory: null, leaveOpen)
+        {
+        }
+
+        public ArrowStreamReader(Stream stream, ICompressionCodecFactory compressionCodecFactory, bool leaveOpen)
+            : this(stream, allocator: null, compressionCodecFactory, leaveOpen)
+        {
+        }
+
+        public ArrowStreamReader(Stream stream, MemoryAllocator allocator, ICompressionCodecFactory compressionCodecFactory, bool leaveOpen)
         {
             if (stream == null)
                 throw new ArgumentNullException(nameof(stream));
 
-            _implementation = new ArrowStreamReaderImplementation(stream, allocator, leaveOpen);
+            _implementation = new ArrowStreamReaderImplementation(stream, allocator, compressionCodecFactory, leaveOpen);
         }
 
         public ArrowStreamReader(ReadOnlyMemory<byte> buffer)
diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
index ffbe956698..8ae8fc49ca 100644
--- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
+++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
@@ -27,7 +27,7 @@ namespace Apache.Arrow.Ipc
         public Stream BaseStream { get; }
         private readonly bool _leaveOpen;
 
-        public ArrowStreamReaderImplementation(Stream stream, MemoryAllocator allocator, bool leaveOpen) : base(allocator)
+        public ArrowStreamReaderImplementation(Stream stream, MemoryAllocator allocator, ICompressionCodecFactory compressionCodecFactory, bool leaveOpen) : base(allocator, compressionCodecFactory)
         {
             BaseStream = stream;
             _leaveOpen = leaveOpen;
diff --git a/csharp/src/Apache.Arrow/Ipc/CompressionCodecType.cs b/csharp/src/Apache.Arrow/Ipc/CompressionCodecType.cs
new file mode 100644
index 0000000000..6e8ecaa2da
--- /dev/null
+++ b/csharp/src/Apache.Arrow/Ipc/CompressionCodecType.cs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+namespace Apache.Arrow.Ipc
+{
+    public enum CompressionCodecType
+    {
+        Lz4Frame,
+        Zstd,
+    }
+}
diff --git a/csharp/src/Apache.Arrow/Ipc/DecompressingBufferCreator.cs b/csharp/src/Apache.Arrow/Ipc/DecompressingBufferCreator.cs
new file mode 100644
index 0000000000..dd8654ada6
--- /dev/null
+++ b/csharp/src/Apache.Arrow/Ipc/DecompressingBufferCreator.cs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Buffers.Binary;
+using Apache.Arrow.Memory;
+
+namespace Apache.Arrow.Ipc
+{
+    /// <summary>
+    /// Creates Arrow buffers from compressed data
+    /// </summary>
+    internal sealed class DecompressingBufferCreator : IBufferCreator
+    {
+        private readonly ICompressionCodec _compressionCodec;
+        private readonly MemoryAllocator _allocator;
+
+        public DecompressingBufferCreator(ICompressionCodec compressionCodec, MemoryAllocator allocator)
+        {
+            _compressionCodec = compressionCodec;
+            _allocator = allocator;
+        }
+
+        public ArrowBuffer CreateBuffer(ReadOnlyMemory<byte> source)
+        {
+            // See the BodyCompressionMethod enum in format/Message.fbs
+            // for documentation on the Buffer compression method used here.
+
+            if (source.Length < 8)
+            {
+                throw new Exception($"Invalid compressed data buffer size ({source.Length}), expected at least 8 bytes");
+            }
+
+            // First 8 bytes give the uncompressed data length
+            var uncompressedLength = BinaryPrimitives.ReadInt64LittleEndian(source.Span.Slice(0, 8));
+            if (uncompressedLength == -1)
+            {
+                // The buffer is not actually compressed
+                return new ArrowBuffer(source.Slice(8));
+            }
+
+            var outputData = _allocator.Allocate(Convert.ToInt32(uncompressedLength));
+            var decompressedLength = _compressionCodec.Decompress(source.Slice(8), outputData.Memory);
+            if (decompressedLength != uncompressedLength)
+            {
+                throw new Exception($"Expected to decompress {uncompressedLength} bytes, but got {decompressedLength} bytes");
+            }
+
+            return new ArrowBuffer(outputData);
+        }
+
+        public void Dispose()
+        {
+            _compressionCodec.Dispose();
+        }
+    }
+}
diff --git a/csharp/src/Apache.Arrow/Ipc/IBufferCreator.cs b/csharp/src/Apache.Arrow/Ipc/IBufferCreator.cs
new file mode 100644
index 0000000000..a6be81f24d
--- /dev/null
+++ b/csharp/src/Apache.Arrow/Ipc/IBufferCreator.cs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+
+namespace Apache.Arrow.Ipc
+{
+    /// <summary>
+    /// Creates Arrow buffers from possibly compressed data
+    /// </summary>
+    internal interface IBufferCreator : IDisposable
+    {
+        ArrowBuffer CreateBuffer(ReadOnlyMemory<byte> source);
+    }
+}
diff --git a/csharp/src/Apache.Arrow/Ipc/ICompressionCodec.cs b/csharp/src/Apache.Arrow/Ipc/ICompressionCodec.cs
new file mode 100644
index 0000000000..b18ca3a5e4
--- /dev/null
+++ b/csharp/src/Apache.Arrow/Ipc/ICompressionCodec.cs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+
+namespace Apache.Arrow.Ipc
+{
+    /// <summary>
+    /// Codec for data compression and decompression (currently only decompression is supported)
+    /// </summary>
+    public interface ICompressionCodec : IDisposable
+    {
+        /// <summary>
+        /// Decompresses a compressed data buffer
+        /// </summary>
+        /// <param name="source">Data buffer to read compressed data from</param>
+        /// <param name="destination">Data buffer to write decompressed data to</param>
+        /// <returns>The number of decompressed bytes written into the destination</returns>
+        int Decompress(ReadOnlyMemory<byte> source, Memory<byte> destination);
+    }
+}
diff --git a/csharp/src/Apache.Arrow/Ipc/ICompressionCodecFactory.cs b/csharp/src/Apache.Arrow/Ipc/ICompressionCodecFactory.cs
new file mode 100644
index 0000000000..5422a033bd
--- /dev/null
+++ b/csharp/src/Apache.Arrow/Ipc/ICompressionCodecFactory.cs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+namespace Apache.Arrow.Ipc
+{
+    /// <summary>
+    /// Provides compression codec implementations for different compression codecs
+    /// </summary>
+    public interface ICompressionCodecFactory
+    {
+        ICompressionCodec CreateCodec(CompressionCodecType compressionCodecType);
+    }
+}
diff --git a/csharp/src/Apache.Arrow/Ipc/NoOpBufferCreator.cs b/csharp/src/Apache.Arrow/Ipc/NoOpBufferCreator.cs
new file mode 100644
index 0000000000..8681aea4c9
--- /dev/null
+++ b/csharp/src/Apache.Arrow/Ipc/NoOpBufferCreator.cs
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+
+namespace Apache.Arrow.Ipc
+{
+    /// <summary>
+    /// Creates Arrow buffers from uncompressed data
+    /// </summary>
+    internal sealed class NoOpBufferCreator : IBufferCreator
+    {
+        public ArrowBuffer CreateBuffer(ReadOnlyMemory<byte> source)
+        {
+            return new ArrowBuffer(source);
+        }
+
+        public void Dispose()
+        {
+        }
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Tests/Apache.Arrow.Tests.csproj b/csharp/test/Apache.Arrow.Tests/Apache.Arrow.Tests.csproj
index b0de6df148..f41261767c 100644
--- a/csharp/test/Apache.Arrow.Tests/Apache.Arrow.Tests.csproj
+++ b/csharp/test/Apache.Arrow.Tests/Apache.Arrow.Tests.csproj
@@ -13,10 +13,24 @@
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
     </PackageReference>
+    <PackageReference Include="CommunityToolkit.HighPerformance" Version="8.0.0" />
+    <PackageReference Include="K4os.Compression.LZ4.Streams" Version="1.3.5" />
+    <PackageReference Include="ZstdSharp.Port" Version="0.6.7" />
   </ItemGroup>
 
   <ItemGroup>
     <ProjectReference Include="..\..\src\Apache.Arrow\Apache.Arrow.csproj" />
   </ItemGroup>
 
+  <ItemGroup>
+    <None Remove="Resources\ipc_lz4_compression.arrow" />
+    <EmbeddedResource Include="Resources\ipc_lz4_compression.arrow" />
+    <None Remove="Resources\ipc_zstd_compression.arrow" />
+    <EmbeddedResource Include="Resources\ipc_zstd_compression.arrow" />
+    <None Remove="Resources\ipc_lz4_compression.arrow_stream" />
+    <EmbeddedResource Include="Resources\ipc_lz4_compression.arrow_stream" />
+    <None Remove="Resources\ipc_zstd_compression.arrow_stream" />
+    <EmbeddedResource Include="Resources\ipc_zstd_compression.arrow_stream" />
+  </ItemGroup>
+
 </Project>
\ No newline at end of file
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs
index cc7f0051a5..55f6d9904f 100644
--- a/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs
+++ b/csharp/test/Apache.Arrow.Tests/ArrowFileReaderTests.cs
@@ -14,11 +14,9 @@
 // limitations under the License.
 
 using Apache.Arrow.Ipc;
-using Apache.Arrow.Memory;
-using Apache.Arrow.Types;
 using System;
-using System.Collections.Generic;
 using System.IO;
+using System.Reflection;
 using System.Threading.Tasks;
 using Xunit;
 
@@ -171,5 +169,41 @@ namespace Apache.Arrow.Tests
             recordBatch.Dispose();
         }
 
+        [Theory]
+        [InlineData("ipc_lz4_compression.arrow")]
+        [InlineData("ipc_zstd_compression.arrow")]
+        public void CanReadCompressedIpcFile(string fileName)
+        {
+            var assembly = Assembly.GetExecutingAssembly();
+            using var stream = assembly.GetManifestResourceStream($"Apache.Arrow.Tests.Resources.{fileName}");
+            var codecFactory = new Compression.CompressionCodecFactory();
+            using var reader = new ArrowFileReader(stream, codecFactory);
+
+            var batch = reader.ReadNextRecordBatch();
+
+            var intArray = (Int32Array) batch.Column("integers");
+            var floatArray = (FloatArray) batch.Column("floats");
+
+            const int numRows = 100;
+            Assert.Equal(numRows, intArray.Length);
+            Assert.Equal(numRows, floatArray.Length);
+
+            for (var i = 0; i < numRows; ++i)
+            {
+                Assert.Equal(i, intArray.GetValue(i));
+                Assert.True(Math.Abs(floatArray.GetValue(i).Value - 0.1f * i) < 1.0e-6);
+            }
+        }
+
+        [Fact]
+        public void ErrorReadingCompressedFileWithoutCodecFactory()
+        {
+            var assembly = Assembly.GetExecutingAssembly();
+            using var stream = assembly.GetManifestResourceStream("Apache.Arrow.Tests.Resources.ipc_lz4_compression.arrow");
+            using var reader = new ArrowFileReader(stream);
+
+            var exception = Assert.Throws<Exception>(() => reader.ReadNextRecordBatch());
+            Assert.Contains("no ICompressionCodecFactory has been configured", exception.Message);
+        }
     }
 }
diff --git a/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
index 973fc6a0a0..1eadcd00fa 100644
--- a/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
+++ b/csharp/test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs
@@ -17,6 +17,7 @@ using Apache.Arrow.Ipc;
 using Apache.Arrow.Memory;
 using System;
 using System.IO;
+using System.Reflection;
 using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
@@ -194,6 +195,32 @@ namespace Apache.Arrow.Tests
             await TestReaderFromPartialReadStream(ArrowReaderVerifier.VerifyReaderAsync, createDictionaryArray);
         }
 
+        [Theory]
+        [InlineData("ipc_lz4_compression.arrow_stream")]
+        [InlineData("ipc_zstd_compression.arrow_stream")]
+        public void CanReadCompressedIpcStream(string fileName)
+        {
+            var assembly = Assembly.GetExecutingAssembly();
+            using var stream = assembly.GetManifestResourceStream($"Apache.Arrow.Tests.Resources.{fileName}");
+            var codecFactory = new Compression.CompressionCodecFactory();
+            using var reader = new ArrowStreamReader(stream, codecFactory);
+
+            var batch = reader.ReadNextRecordBatch();
+
+            var intArray = (Int32Array) batch.Column("integers");
+            var floatArray = (FloatArray) batch.Column("floats");
+
+            const int numRows = 100;
+            Assert.Equal(numRows, intArray.Length);
+            Assert.Equal(numRows, floatArray.Length);
+
+            for (var i = 0; i < numRows; ++i)
+            {
+                Assert.Equal(i, intArray.GetValue(i));
+                Assert.True(Math.Abs(floatArray.GetValue(i).Value - 0.1f * i) < 1.0e-6);
+            }
+        }
+
         /// <summary>
         /// Verifies that the stream reader reads multiple times when a stream
         /// only returns a subset of the data from each Read.
diff --git a/csharp/test/Apache.Arrow.Tests/Compression/CompressionCodecFactory.cs b/csharp/test/Apache.Arrow.Tests/Compression/CompressionCodecFactory.cs
new file mode 100644
index 0000000000..8b2e8e4b71
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Tests/Compression/CompressionCodecFactory.cs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using Apache.Arrow.Ipc;
+
+namespace Apache.Arrow.Tests.Compression
+{
+    internal sealed class CompressionCodecFactory : ICompressionCodecFactory
+    {
+        public ICompressionCodec CreateCodec(CompressionCodecType compressionCodecType)
+        {
+            return compressionCodecType switch
+            {
+                CompressionCodecType.Lz4Frame => new Lz4CompressionCodec(),
+                CompressionCodecType.Zstd => new ZstdCompressionCodec(),
+                _ => throw new NotImplementedException($"Compression type {compressionCodecType} is not supported")
+            };
+        }
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Tests/Compression/Lz4CompressionCodec.cs b/csharp/test/Apache.Arrow.Tests/Compression/Lz4CompressionCodec.cs
new file mode 100644
index 0000000000..c1249b3629
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Tests/Compression/Lz4CompressionCodec.cs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using Apache.Arrow.Ipc;
+using CommunityToolkit.HighPerformance;
+using K4os.Compression.LZ4.Streams;
+
+namespace Apache.Arrow.Tests.Compression
+{
+    internal sealed class Lz4CompressionCodec : ICompressionCodec
+    {
+        public int Decompress(ReadOnlyMemory<byte> source, Memory<byte> destination)
+        {
+            using var sourceStream = source.AsStream();
+            using var destStream = destination.AsStream();
+            using var decompressedStream = LZ4Stream.Decode(sourceStream);
+            decompressedStream.CopyTo(destStream);
+            return (int) destStream.Length;
+        }
+
+        public void Dispose()
+        {
+        }
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Tests/Compression/ZstdCompressionCodec.cs b/csharp/test/Apache.Arrow.Tests/Compression/ZstdCompressionCodec.cs
new file mode 100644
index 0000000000..0993bd489b
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Tests/Compression/ZstdCompressionCodec.cs
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using Apache.Arrow.Ipc;
+using ZstdSharp;
+
+namespace Apache.Arrow.Tests.Compression
+{
+    internal sealed class ZstdCompressionCodec : ICompressionCodec
+    {
+        private readonly Decompressor _decompressor;
+
+        public ZstdCompressionCodec()
+        {
+            _decompressor = new Decompressor();
+        }
+
+        public int Decompress(ReadOnlyMemory<byte> source, Memory<byte> destination)
+        {
+            return _decompressor.Unwrap(source.Span, destination.Span);
+        }
+
+        public void Dispose()
+        {
+            _decompressor.Dispose();
+        }
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Tests/Resources/ipc_lz4_compression.arrow b/csharp/test/Apache.Arrow.Tests/Resources/ipc_lz4_compression.arrow
new file mode 100644
index 0000000000..f4ef4ac121
Binary files /dev/null and b/csharp/test/Apache.Arrow.Tests/Resources/ipc_lz4_compression.arrow differ
diff --git a/csharp/test/Apache.Arrow.Tests/Resources/ipc_lz4_compression.arrow_stream b/csharp/test/Apache.Arrow.Tests/Resources/ipc_lz4_compression.arrow_stream
new file mode 100644
index 0000000000..50b25029bb
Binary files /dev/null and b/csharp/test/Apache.Arrow.Tests/Resources/ipc_lz4_compression.arrow_stream differ
diff --git a/csharp/test/Apache.Arrow.Tests/Resources/ipc_zstd_compression.arrow b/csharp/test/Apache.Arrow.Tests/Resources/ipc_zstd_compression.arrow
new file mode 100644
index 0000000000..2c943c52e2
Binary files /dev/null and b/csharp/test/Apache.Arrow.Tests/Resources/ipc_zstd_compression.arrow differ
diff --git a/csharp/test/Apache.Arrow.Tests/Resources/ipc_zstd_compression.arrow_stream b/csharp/test/Apache.Arrow.Tests/Resources/ipc_zstd_compression.arrow_stream
new file mode 100644
index 0000000000..8923e9520d
Binary files /dev/null and b/csharp/test/Apache.Arrow.Tests/Resources/ipc_zstd_compression.arrow_stream differ
diff --git a/csharp/test/Apache.Arrow.Tests/generate_resources.py b/csharp/test/Apache.Arrow.Tests/generate_resources.py
new file mode 100644
index 0000000000..c53a3bd85d
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Tests/generate_resources.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Generates files required for tests where it's not possible to generate
+them with dotnet Arrow.
+"""
+
+import pyarrow as pa
+from pathlib import Path
+
+
+def write_compressed_ipc_file(path: Path, compression: str, stream: bool = False):
+    schema = pa.schema([
+        pa.field('integers', pa.int32()),
+        pa.field('floats', pa.float32()),
+    ])
+
+    integers = pa.array(range(100), type=pa.int32())
+    floats = pa.array((x * 0.1 for x in range(100)), type=pa.float32())
+    batch = pa.record_batch([integers, floats], schema)
+
+    options = pa.ipc.IpcWriteOptions(compression=compression)
+
+    with pa.OSFile(path.as_posix(), 'wb') as sink:
+        if stream:
+            with pa.ipc.new_stream(sink, schema, options=options) as writer:
+                writer.write(batch)
+        else:
+            with pa.ipc.new_file(sink, schema, options=options) as writer:
+                writer.write(batch)
+
+
+if __name__ == '__main__':
+    resource_dir = Path(__file__).resolve().parent / 'Resources'
+
+    write_compressed_ipc_file(resource_dir / 'ipc_lz4_compression.arrow', 'lz4')
+    write_compressed_ipc_file(resource_dir / 'ipc_lz4_compression.arrow_stream', 'lz4', stream=True)
+    write_compressed_ipc_file(resource_dir / 'ipc_zstd_compression.arrow', 'zstd')
+    write_compressed_ipc_file(resource_dir / 'ipc_zstd_compression.arrow_stream', 'zstd', stream=True)