You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/02/01 19:58:43 UTC
[avro] branch master updated: AVRO-3223: Support optional codecs in C# library (#1358)
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new e8cd440 AVRO-3223: Support optional codecs in C# library (#1358)
e8cd440 is described below
commit e8cd44067de5090ec6371e97b4b7dba4ba230769
Author: Tiago Margalho <ti...@gmail.com>
AuthorDate: Tue Feb 1 19:55:09 2022 +0000
AVRO-3223: Support optional codecs in C# library (#1358)
* AVRO-3223: Support optional codecs in C# library
* AVRO-3223: Mark old method as obsolete
---
lang/csharp/src/apache/main/File/Codec.cs | 48 +++++++++++-
lang/csharp/src/apache/main/File/DataFileReader.cs | 2 +-
lang/csharp/src/apache/main/File/DeflateCodec.cs | 3 +-
lang/csharp/src/apache/main/File/NullCodec.cs | 2 +-
lang/csharp/src/apache/test/File/FileTests.cs | 87 ++++++++++++++++++++++
5 files changed, 136 insertions(+), 6 deletions(-)
diff --git a/lang/csharp/src/apache/main/File/Codec.cs b/lang/csharp/src/apache/main/File/Codec.cs
index 4afb60e..9aa7de4 100644
--- a/lang/csharp/src/apache/main/File/Codec.cs
+++ b/lang/csharp/src/apache/main/File/Codec.cs
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+using System;
+using System.Collections.Generic;
using System.IO;
namespace Avro.File
@@ -45,9 +47,21 @@ namespace Avro.File
/// <summary>
/// Decompress data using implemented codec.
/// </summary>
- /// <param name="compressedData">The compressed data.</param>
- /// <returns>byte array.</returns>
- public abstract byte[] Decompress(byte[] compressedData);
+ /// <param name="compressedData">The buffer holding data to decompress.</param>
+ /// <returns>A byte array holding the decompressed data.</returns>
+ [Obsolete]
+ public virtual byte[] Decompress(byte[] compressedData)
+ {
+ return Decompress(compressedData, compressedData.Length);
+ }
+
+ /// <summary>
+ /// Decompress data using implemented codec
+ /// </summary>
+ /// <param name="compressedData">The buffer holding data to decompress.</param>
+ /// <param name="length">The actual length of bytes to decompress from the buffer.</param>
+ /// <returns>A byte array holding the decompressed data.</returns>
+ abstract public byte[] Decompress(byte[] compressedData, int length);
/// <summary>
/// Name of this codec type.
@@ -90,6 +104,25 @@ namespace Avro.File
}
/// <summary>
+ /// Represents a function capable of resolving a codec string
+ /// with a matching codec implementation a reader can use to decompress data.
+ /// </summary>
+ /// <param name="codecMetaString">The codec string</param>
+ public delegate Codec CodecResolver(string codecMetaString);
+
+ private static readonly List<CodecResolver> _codecResolvers = new List<CodecResolver>();
+
+ /// <summary>
+ /// Registers a function that will attempt to resolve a codec identifying string
+ /// with a matching codec implementation when reading compressed Avro data.
+ /// </summary>
+ /// <param name="resolver">A function that is able to find a codec implementation for a given codec string</param>
+ public static void RegisterResolver(CodecResolver resolver)
+ {
+ _codecResolvers.Add(resolver);
+ }
+
+ /// <summary>
/// Factory method to return child codec instance based on Codec.Type.
/// </summary>
/// <param name="codecType">Type of the codec.</param>
@@ -114,6 +147,15 @@ namespace Avro.File
/// <returns>Codec based on type.</returns>
public static Codec CreateCodecFromString(string codecType)
{
+ foreach (var resolver in _codecResolvers)
+ {
+ var candidateCodec = resolver(codecType);
+ if (candidateCodec != null)
+ {
+ return candidateCodec;
+ }
+ }
+
switch (codecType)
{
case DataFileConstants.DeflateCodec:
diff --git a/lang/csharp/src/apache/main/File/DataFileReader.cs b/lang/csharp/src/apache/main/File/DataFileReader.cs
index 34b4c6e..c96b68a 100644
--- a/lang/csharp/src/apache/main/File/DataFileReader.cs
+++ b/lang/csharp/src/apache/main/File/DataFileReader.cs
@@ -334,7 +334,7 @@ namespace Avro.File
if (HasNextBlock())
{
_currentBlock = NextRawBlock(_currentBlock);
- _currentBlock.Data = _codec.Decompress(_currentBlock.Data);
+ _currentBlock.Data = _codec.Decompress(_currentBlock.Data, (int)this._blockSize);
_datumDecoder = new BinaryDecoder(_currentBlock.GetDataAsStream());
}
}
diff --git a/lang/csharp/src/apache/main/File/DeflateCodec.cs b/lang/csharp/src/apache/main/File/DeflateCodec.cs
index 1a4d9a6..4de017a 100644
--- a/lang/csharp/src/apache/main/File/DeflateCodec.cs
+++ b/lang/csharp/src/apache/main/File/DeflateCodec.cs
@@ -54,8 +54,9 @@ namespace Avro.File
}
/// <inheritdoc/>
- public override byte[] Decompress(byte[] compressedData)
+ public override byte[] Decompress(byte[] compressedData, int length)
{
+
MemoryStream inStream = new MemoryStream(compressedData);
MemoryStream outStream = new MemoryStream();
diff --git a/lang/csharp/src/apache/main/File/NullCodec.cs b/lang/csharp/src/apache/main/File/NullCodec.cs
index 1255941..78803dd 100644
--- a/lang/csharp/src/apache/main/File/NullCodec.cs
+++ b/lang/csharp/src/apache/main/File/NullCodec.cs
@@ -45,7 +45,7 @@ namespace Avro.File
}
/// <inheritdoc/>
- public override byte[] Decompress(byte[] compressedData)
+ public override byte[] Decompress(byte[] compressedData, int length)
{
return compressedData;
}
diff --git a/lang/csharp/src/apache/test/File/FileTests.cs b/lang/csharp/src/apache/test/File/FileTests.cs
index 9229bf4..5cde994 100644
--- a/lang/csharp/src/apache/test/File/FileTests.cs
+++ b/lang/csharp/src/apache/test/File/FileTests.cs
@@ -819,6 +819,57 @@ namespace Avro.Test.File
}
}
+ /// <summary>
+ /// Reading and writing using optional codecs
+ /// </summary>
+ /// <param name="schemaStr"></param>
+ /// <param name="recs"></param>
+ [TestCase("zstd", true)]
+ [TestCase("deflate", false)]
+ [TestCase("null", false)]
+ public void TestOptionalCodecs(string codecToUse, bool expectResolverProvidedCodec)
+ {
+ var resolverProvidedCodec = false;
+
+ var fakeCodec = new FakeZstdCodec();
+ Codec codecResolver(string codecString)
+ {
+ if (codecString == "zstd")
+ {
+ resolverProvidedCodec = true;
+ return fakeCodec;
+ }
+
+ return null;
+ }
+
+ Codec.RegisterResolver(codecResolver);
+
+ RecordSchema schema = Schema.Parse( "{\"type\":\"record\", \"name\":\"n\", \"fields\":[{\"name\":\"f1\", \"type\":\"string\"},"
+ + "{\"name\":\"f2\", \"type\":\"string\"}]}" ) as RecordSchema;
+
+ foreach(var rwFactory in GenericOptions<GenericRecord>())
+ {
+ using (MemoryStream dataFileOutputStream = new MemoryStream())
+ {
+ using (var writer = rwFactory.CreateWriter(dataFileOutputStream, schema, fakeCodec))
+ {
+ writer.Append(mkRecord(new [] { "f1", "f1val", "f2", "f2val" }, schema));
+ }
+
+ using (var dataFileInputStream = new MemoryStream(dataFileOutputStream.ToArray()))
+ using (IFileReader<GenericRecord> reader = rwFactory.CreateReader(dataFileInputStream, schema))
+ {
+ GenericRecord result = reader.Next();
+ Assert.AreEqual("f1val", result["f1"]);
+ Assert.AreEqual("f2val", result["f2"]);
+ }
+ }
+ }
+
+ Assert.AreEqual(expectResolverProvidedCodec, resolverProvidedCodec);
+ }
+
private bool CheckPrimitive<T>(Stream input, T value, ReaderWriterSet<T>.ReaderFactory createReader)
{
IFileReader<T> reader = createReader(input, null);
@@ -1048,4 +1099,40 @@ namespace Avro.Test.File
return string.Format("Name: {0}, Age: {1}", name, age);
}
}
+
+ class FakeZstdCodec : Codec
+ {
+ private DeflateCodec _codec = new DeflateCodec();
+ public override byte[] Compress(byte[] uncompressedData)
+ {
+ return _codec.Compress(uncompressedData);
+ }
+
+ public override void Compress(MemoryStream inputStream, MemoryStream outputStream)
+ {
+ _codec.Compress(inputStream, outputStream);
+ }
+
+ public override byte[] Decompress(byte[] compressedData, int length)
+ {
+ return _codec.Decompress(compressedData, length);
+ }
+
+ public override bool Equals(object other)
+ {
+ if (other == null) return false;
+
+ return this == other;
+ }
+
+ public override int GetHashCode()
+ {
+ return GetName().GetHashCode();
+ }
+
+ public override string GetName()
+ {
+ return "zstd";
+ }
+ }
}