You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2022/06/14 12:02:50 UTC

[pulsar-dotpulsar] branch master updated: Implemented support for ZstdSharp.Port

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c34db7  Implemented support for ZstdSharp.Port
0c34db7 is described below

commit 0c34db7059ad07504e2337ad4d9596365853e9e6
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Tue Jun 14 14:02:34 2022 +0200

    Implemented support for ZstdSharp.Port
---
 .../Internal/Compression/CompressionFactories.cs   |  24 ++-
 .../Internal/Compression/ZstdSharpCompression.cs   | 180 +++++++++++++++++++++
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj       |   1 +
 .../Internal/Compression/ZstdCompressionTests.cs   |  11 +-
 ...essionTests.cs => ZstdSharpCompressionTests.cs} |   4 +-
 5 files changed, 214 insertions(+), 6 deletions(-)

diff --git a/src/DotPulsar/Internal/Compression/CompressionFactories.cs b/src/DotPulsar/Internal/Compression/CompressionFactories.cs
index b9eb1ef..566bc65 100644
--- a/src/DotPulsar/Internal/Compression/CompressionFactories.cs
+++ b/src/DotPulsar/Internal/Compression/CompressionFactories.cs
@@ -27,17 +27,35 @@ public static class CompressionFactories
         _compressorFactories = new List<ICompressorFactory>();
         _decompressorFactories = new List<IDecompressorFactory>();
 
+        LoadSupportForLz4();
+        LoadSupportForSnappy();
+        LoadSupportForZlib();
+        LoadSupportForZstd();
+    }
 
+    private static void LoadSupportForLz4()
+    {
         if (Lz4Compression.TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory))
             Add(compressorFactory, decompressorFactory);
+    }
 
-        if (SnappyCompression.TryLoading(out compressorFactory, out decompressorFactory))
+    private static void LoadSupportForSnappy()
+    {
+        if (SnappyCompression.TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory))
             Add(compressorFactory, decompressorFactory);
+    }
 
-        if (ZlibCompression.TryLoading(out compressorFactory, out decompressorFactory))
+    private static void LoadSupportForZlib()
+    {
+        if (ZlibCompression.TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory))
             Add(compressorFactory, decompressorFactory);
+    }
 
-        if (ZstdCompression.TryLoading(out compressorFactory, out decompressorFactory))
+    private static void LoadSupportForZstd()
+    {
+        if (ZstdSharpCompression.TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory))
+            Add(compressorFactory, decompressorFactory);
+        else if (ZstdCompression.TryLoading(out compressorFactory, out decompressorFactory))
             Add(compressorFactory, decompressorFactory);
     }
 
diff --git a/src/DotPulsar/Internal/Compression/ZstdSharpCompression.cs b/src/DotPulsar/Internal/Compression/ZstdSharpCompression.cs
new file mode 100644
index 0000000..9501f74
--- /dev/null
+++ b/src/DotPulsar/Internal/Compression/ZstdSharpCompression.cs
@@ -0,0 +1,180 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Compression;
+
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Abstractions;
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+
+public static class ZstdSharpCompression
+{
+    public delegate Span<byte> Wrap(ReadOnlySpan<byte> src);
+    public delegate int Unwrap(byte[] src, byte[] dst, int offset);
+
+    public static bool TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory)
+    {
+        try
+        {
+            var assembly = Assembly.Load("ZstdSharp");
+
+            var definedTypes = assembly.DefinedTypes.ToArray();
+
+            var decompressorType = FindDecompressor(definedTypes, "ZstdSharp.Decompressor");
+            var decompressorMethods = decompressorType.GetMethods(BindingFlags.Public | BindingFlags.Instance);
+            var unwrapMethod = FindUnwrap(decompressorMethods);
+
+            var compressorType = FindCompressor(definedTypes, "ZstdSharp.Compressor");
+            var compressorMethods = compressorType.GetMethods(BindingFlags.Public | BindingFlags.Instance);
+            var wrapMethod = FindWrap(compressorMethods);
+
+            compressorFactory = new CompressorFactory(PulsarApi.CompressionType.Zstd, () =>
+            {
+                var compressor = Activator.CreateInstance(compressorType, 0);
+                if (compressor is null)
+                    throw new Exception($"Activator.CreateInstance returned null when trying to create a {compressorType.FullName}");
+
+                var wrap = (Wrap) wrapMethod.CreateDelegate(typeof(Wrap), compressor);
+                return new Compressor(CreateCompressor(wrap), (IDisposable) compressor);
+            });
+
+            decompressorFactory = new DecompressorFactory(PulsarApi.CompressionType.Zstd, () =>
+            {
+                var decompressor = Activator.CreateInstance(decompressorType);
+                if (decompressor is null)
+                    throw new Exception($"Activator.CreateInstance returned null when trying to create a {decompressorType.FullName}");
+
+                var unwrap = (Unwrap) unwrapMethod.CreateDelegate(typeof(Unwrap), decompressor);
+                return new Decompressor(CreateDecompressor(unwrap), (IDisposable) decompressor);
+            });
+
+            return CompressionTester.TestCompression(compressorFactory, decompressorFactory);
+        }
+        catch
+        {
+            // Ignore
+        }
+
+        compressorFactory = null;
+        decompressorFactory = null;
+
+        return false;
+    }
+
+    private static TypeInfo FindDecompressor(IEnumerable<TypeInfo> types, string fullName)
+    {
+        foreach (var type in types)
+        {
+            if (type.FullName is null || !type.FullName.Equals(fullName))
+                continue;
+
+            if (type.IsPublic &&
+                type.IsClass &&
+                !type.IsAbstract &&
+                type.ImplementedInterfaces.Contains(typeof(IDisposable)) &&
+                type.GetConstructor(Type.EmptyTypes) is not null)
+                return type;
+
+            break;
+        }
+
+        throw new Exception($"{fullName} as a public class with an empty public constructor and implementing IDisposable was not found");
+    }
+
+    private static TypeInfo FindCompressor(IEnumerable<TypeInfo> types, string fullName)
+    {
+        foreach (var type in types)
+        {
+            if (type.FullName is null || !type.FullName.Equals(fullName))
+                continue;
+
+            if (type.IsPublic &&
+                type.IsClass &&
+                !type.IsAbstract &&
+                type.ImplementedInterfaces.Contains(typeof(IDisposable)) &&
+                type.GetConstructor(new[] { typeof(int) }) is not null)
+                return type;
+
+            break;
+        }
+
+        throw new Exception($"{fullName} as a public class with an public constructor taking an integer and implementing IDisposable was not found");
+    }
+
+    private static MethodInfo FindWrap(MethodInfo[] methods)
+    {
+        const string name = "Wrap";
+
+        foreach (var method in methods)
+        {
+            if (method.Name != name || method.ReturnType != typeof(Span<byte>))
+                continue;
+
+            var parameters = method.GetParameters();
+            if (parameters.Length != 1)
+                continue;
+
+            if (parameters[0].ParameterType != typeof(ReadOnlySpan<byte>))
+                continue;
+
+            return method;
+        }
+
+        throw new Exception($"A method with the name '{name}' matching the delegate was not found");
+    }
+
+    private static MethodInfo FindUnwrap(MethodInfo[] methods)
+    {
+        const string name = "Unwrap";
+
+        foreach (var method in methods)
+        {
+            if (method.Name != name || method.ReturnType != typeof(int))
+                continue;
+
+            var parameters = method.GetParameters();
+            if (parameters.Length != 3)
+                continue;
+
+            if (parameters[0].ParameterType != typeof(byte[]) ||
+                parameters[1].ParameterType != typeof(byte[]) ||
+                parameters[2].ParameterType != typeof(int))
+                continue;
+
+            return method;
+        }
+
+        throw new Exception($"A method with the name '{name}' matching the delegate was not found");
+    }
+
+    private static Func<ReadOnlySequence<byte>, int, ReadOnlySequence<byte>> CreateDecompressor(Unwrap decompress)
+    {
+        return (source, size) =>
+        {
+            var decompressed = new byte[size];
+            var bytesDecompressed = decompress(source.ToArray(), decompressed, 0);
+            if (size == bytesDecompressed)
+                return new ReadOnlySequence<byte>(decompressed);
+
+            throw new CompressionException($"ZstdSharp.Decompressor returned {bytesDecompressed} but expected {size}");
+        };
+    }
+
+    private static Func<ReadOnlySequence<byte>, ReadOnlySequence<byte>> CreateCompressor(Wrap compress)
+        => (source) => new ReadOnlySequence<byte>(compress(source.ToArray().AsSpan()).ToArray());
+}
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index f473d10..7ed0d84 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -23,6 +23,7 @@
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>
     <PackageReference Include="ZstdNet" Version="1.4.5" />
+    <PackageReference Include="ZstdSharp.Port" Version="0.6.1" />
   </ItemGroup>
 
   <ItemGroup>
diff --git a/tests/DotPulsar.Tests/Internal/Compression/ZstdCompressionTests.cs b/tests/DotPulsar.Tests/Internal/Compression/ZstdCompressionTests.cs
index 87460fa..586dfa9 100644
--- a/tests/DotPulsar.Tests/Internal/Compression/ZstdCompressionTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Compression/ZstdCompressionTests.cs
@@ -17,6 +17,7 @@ namespace DotPulsar.Tests.Internal.Compression;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Compression;
 using FluentAssertions;
+using System.Runtime.InteropServices;
 using Xunit;
 
 [Trait("Category", "Unit")]
@@ -27,7 +28,15 @@ public class ZstdCompressionTests
     {
         // Arrange
         var couldLoad = ZstdCompression.TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory);
-        couldLoad.Should().BeTrue();
+
+        if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
+        {
+            couldLoad.Should().BeFalse();
+            return;
+        }
+        else
+            couldLoad.Should().BeTrue();
+
         using var compressor = compressorFactory!.Create();
         using var decompressor = decompressorFactory!.Create();
 
diff --git a/tests/DotPulsar.Tests/Internal/Compression/ZstdCompressionTests.cs b/tests/DotPulsar.Tests/Internal/Compression/ZstdSharpCompressionTests.cs
similarity index 86%
copy from tests/DotPulsar.Tests/Internal/Compression/ZstdCompressionTests.cs
copy to tests/DotPulsar.Tests/Internal/Compression/ZstdSharpCompressionTests.cs
index 87460fa..ca34f83 100644
--- a/tests/DotPulsar.Tests/Internal/Compression/ZstdCompressionTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Compression/ZstdSharpCompressionTests.cs
@@ -20,13 +20,13 @@ using FluentAssertions;
 using Xunit;
 
 [Trait("Category", "Unit")]
-public class ZstdCompressionTests
+public class ZstdSharpCompressionTests
 {
     [Fact]
     public void Compression_GivenDataToCompressAndDecompress_ShouldReturnOriginalData()
     {
         // Arrange
-        var couldLoad = ZstdCompression.TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory);
+        var couldLoad = ZstdSharpCompression.TryLoading(out ICompressorFactory? compressorFactory, out IDecompressorFactory? decompressorFactory);
         couldLoad.Should().BeTrue();
         using var compressor = compressorFactory!.Create();
         using var decompressor = decompressorFactory!.Create();