You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/02/01 19:58:43 UTC

[avro] branch master updated: AVRO-3223: Support optional codecs in C# library (#1358)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new e8cd440  AVRO-3223: Support optional codecs in C# library (#1358)
e8cd440 is described below

commit e8cd44067de5090ec6371e97b4b7dba4ba230769
Author: Tiago Margalho <ti...@gmail.com>
AuthorDate: Tue Feb 1 19:55:09 2022 +0000

    AVRO-3223: Support optional codecs in C# library (#1358)
    
    * AVRO-3223: Support optional codecs in C# library
    
    * AVRO-3223: Mark old method as obsolete
---
 lang/csharp/src/apache/main/File/Codec.cs          | 48 +++++++++++-
 lang/csharp/src/apache/main/File/DataFileReader.cs |  2 +-
 lang/csharp/src/apache/main/File/DeflateCodec.cs   |  3 +-
 lang/csharp/src/apache/main/File/NullCodec.cs      |  2 +-
 lang/csharp/src/apache/test/File/FileTests.cs      | 87 ++++++++++++++++++++++
 5 files changed, 136 insertions(+), 6 deletions(-)

diff --git a/lang/csharp/src/apache/main/File/Codec.cs b/lang/csharp/src/apache/main/File/Codec.cs
index 4afb60e..9aa7de4 100644
--- a/lang/csharp/src/apache/main/File/Codec.cs
+++ b/lang/csharp/src/apache/main/File/Codec.cs
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 
+using System;
+using System.Collections.Generic;
 using System.IO;
 
 namespace Avro.File
@@ -45,9 +47,21 @@ namespace Avro.File
         /// <summary>
         /// Decompress data using implemented codec.
         /// </summary>
-        /// <param name="compressedData">The compressed data.</param>
-        /// <returns>byte array.</returns>
-        public abstract byte[] Decompress(byte[] compressedData);
+        /// <param name="compressedData">The buffer holding data to decompress.</param>
+        /// <returns>A byte array holding the decompressed data.</returns>
+        [Obsolete]
+        public virtual byte[] Decompress(byte[] compressedData)
+        {
+            return Decompress(compressedData, compressedData.Length);
+        }
+        
+        /// <summary>
+        /// Decompress data using implemented codec
+        /// </summary>
+        /// <param name="compressedData">The buffer holding data to decompress.</param>
+        /// <param name="length">The actual length of bytes to decompress from the buffer.</param>
+        /// <returns>A byte array holding the decompressed data.</returns>
+        abstract public byte[] Decompress(byte[] compressedData, int length);
 
         /// <summary>
         /// Name of this codec type.
@@ -90,6 +104,25 @@ namespace Avro.File
         }
 
         /// <summary>
+        /// Represents a function capable of resolving a codec string
+        /// with a matching codec implementation a reader can use to decompress data.
+        /// </summary>
+        /// <param name="codecMetaString">The codec string</param>
+        public delegate Codec CodecResolver(string codecMetaString);
+
+        private static readonly List<CodecResolver> _codecResolvers = new List<CodecResolver>();
+
+        /// <summary>
+        /// Registers a function that will attempt to resolve a codec identifying string
+        /// with a matching codec implementation when reading compressed Avro data.
+        /// </summary>
+        /// <param name="resolver">A function that is able to find a codec implementation for a given codec string</param>
+        public static void RegisterResolver(CodecResolver resolver)
+        {
+            _codecResolvers.Add(resolver);
+        }
+
+        /// <summary>
         /// Factory method to return child codec instance based on Codec.Type.
         /// </summary>
         /// <param name="codecType">Type of the codec.</param>
@@ -114,6 +147,15 @@ namespace Avro.File
         /// <returns>Codec based on type.</returns>
         public static Codec CreateCodecFromString(string codecType)
         {
+            foreach (var resolver in _codecResolvers)
+            {
+                var candidateCodec = resolver(codecType);
+                if (candidateCodec != null)
+                {
+                    return candidateCodec;
+                }
+            }
+            
             switch (codecType)
             {
                 case DataFileConstants.DeflateCodec:
diff --git a/lang/csharp/src/apache/main/File/DataFileReader.cs b/lang/csharp/src/apache/main/File/DataFileReader.cs
index 34b4c6e..c96b68a 100644
--- a/lang/csharp/src/apache/main/File/DataFileReader.cs
+++ b/lang/csharp/src/apache/main/File/DataFileReader.cs
@@ -334,7 +334,7 @@ namespace Avro.File
                     if (HasNextBlock())
                     {
                         _currentBlock = NextRawBlock(_currentBlock);
-                        _currentBlock.Data = _codec.Decompress(_currentBlock.Data);
+                        _currentBlock.Data = _codec.Decompress(_currentBlock.Data, (int)this._blockSize);
                         _datumDecoder = new BinaryDecoder(_currentBlock.GetDataAsStream());
                     }
                 }
diff --git a/lang/csharp/src/apache/main/File/DeflateCodec.cs b/lang/csharp/src/apache/main/File/DeflateCodec.cs
index 1a4d9a6..4de017a 100644
--- a/lang/csharp/src/apache/main/File/DeflateCodec.cs
+++ b/lang/csharp/src/apache/main/File/DeflateCodec.cs
@@ -54,8 +54,9 @@ namespace Avro.File
         }
 
         /// <inheritdoc/>
-        public override byte[] Decompress(byte[] compressedData)
+        public override byte[] Decompress(byte[] compressedData, int length)
         {
+            
             MemoryStream inStream = new MemoryStream(compressedData);
             MemoryStream outStream = new MemoryStream();
 
diff --git a/lang/csharp/src/apache/main/File/NullCodec.cs b/lang/csharp/src/apache/main/File/NullCodec.cs
index 1255941..78803dd 100644
--- a/lang/csharp/src/apache/main/File/NullCodec.cs
+++ b/lang/csharp/src/apache/main/File/NullCodec.cs
@@ -45,7 +45,7 @@ namespace Avro.File
         }
 
         /// <inheritdoc/>
-        public override byte[] Decompress(byte[] compressedData)
+        public override byte[] Decompress(byte[] compressedData, int length)
         {
             return compressedData;
         }
diff --git a/lang/csharp/src/apache/test/File/FileTests.cs b/lang/csharp/src/apache/test/File/FileTests.cs
index 9229bf4..5cde994 100644
--- a/lang/csharp/src/apache/test/File/FileTests.cs
+++ b/lang/csharp/src/apache/test/File/FileTests.cs
@@ -819,6 +819,57 @@ namespace Avro.Test.File
             }
         }
 
+        /// <summary>
+        /// Reading and writing using optional codecs
+        /// </summary>
+        /// <param name="schemaStr"></param>
+        /// <param name="recs"></param>
+        [TestCase("zstd", true)]
+        [TestCase("deflate", false)]
+        [TestCase("null", false)]
+        public void TestOptionalCodecs(string codecToUse, bool expectResolverProvidedCodec)
+        {
+            var resolverProvidedCodec = false;
+
+            var fakeCodec = new FakeZstdCodec();
+            Codec codecResolver(string codecString)
+            {
+                if (codecString == "zstd")
+                {
+                    resolverProvidedCodec = true;
+                    return fakeCodec;
+                }
+
+                return null;
+            }
+
+            Codec.RegisterResolver(codecResolver);
+
+            RecordSchema schema = Schema.Parse( "{\"type\":\"record\", \"name\":\"n\", \"fields\":[{\"name\":\"f1\", \"type\":\"string\"},"
+                + "{\"name\":\"f2\", \"type\":\"string\"}]}" ) as RecordSchema;
+
+            foreach(var rwFactory in GenericOptions<GenericRecord>())
+            {
+                using (MemoryStream dataFileOutputStream = new MemoryStream())
+                {
+                    using (var writer = rwFactory.CreateWriter(dataFileOutputStream, schema, fakeCodec))
+                    {
+                        writer.Append(mkRecord(new [] { "f1", "f1val", "f2", "f2val" }, schema));
+                    }
+
+                    using (var dataFileInputStream = new MemoryStream(dataFileOutputStream.ToArray()))
+                    using (IFileReader<GenericRecord> reader = rwFactory.CreateReader(dataFileInputStream, schema))
+                    {
+                        GenericRecord result = reader.Next();
+                        Assert.AreEqual("f1val", result["f1"]);
+                        Assert.AreEqual("f2val", result["f2"]);
+                    }
+                }
+            }
+
+            Assert.AreEqual(expectResolverProvidedCodec, resolverProvidedCodec);
+        }
+
         private bool CheckPrimitive<T>(Stream input, T value, ReaderWriterSet<T>.ReaderFactory createReader)
         {
             IFileReader<T> reader = createReader(input, null);
@@ -1048,4 +1099,40 @@ namespace Avro.Test.File
             return string.Format("Name: {0}, Age: {1}", name, age);
         }
     }
+
+    class FakeZstdCodec : Codec
+    {
+        private DeflateCodec _codec = new DeflateCodec();
+        public override byte[] Compress(byte[] uncompressedData)
+        {
+            return _codec.Compress(uncompressedData);
+        }
+
+        public override void Compress(MemoryStream inputStream, MemoryStream outputStream)
+        {
+            _codec.Compress(inputStream, outputStream);
+        }
+
+        public override byte[] Decompress(byte[] compressedData, int length)
+        {
+            return _codec.Decompress(compressedData, length);
+        }
+
+        public override bool Equals(object other)
+        {
+            if (other == null) return false;
+
+            return this == other;
+        }
+
+        public override int GetHashCode()
+        {
+            return GetName().GetHashCode();
+        }
+
+        public override string GetName()
+        {
+            return "zstd";
+        }
+    }
 }