You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "divijvaidya (via GitHub)" <gi...@apache.org> on 2023/06/07 13:18:25 UTC

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13814: KAFKA-15057: Use new interface from zstd-jni

divijvaidya commented on code in PR #13814:
URL: https://github.com/apache/kafka/pull/13814#discussion_r1221584156


##########
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##########
@@ -45,27 +44,47 @@ public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
         }
     }
 
-    public static InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
+    public static InputStream wrapForInput(ByteBuffer compressedData, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
         try {
-            // We use our own BufferSupplier instead of com.github.luben.zstd.RecyclingBufferPool since our
-            // implementation doesn't require locking or soft references. The buffer allocated by this buffer pool is
-            // used by zstd-jni for 1\ reading compressed data from input stream into a buffer before passing it over JNI
-            // 2\ implementation of skip inside zstd-jni where buffer is obtained and released with every call
-            final BufferPool bufferPool = new BufferPool() {
+            // The responsibility of closing the stream is pushed to the caller of this method.
+            final ZstdBufferDecompressingStreamNoFinalizer stream = new ZstdBufferDecompressingStreamNoFinalizer(compressedData);
+            return new InputStream() {
                 @Override
-                public ByteBuffer get(int capacity) {
-                    return decompressionBufferSupplier.get(capacity);
+                public int read() throws IOException {
+                    // prevent a call to underlying stream if no data is remaining
+                    if (!stream.hasRemaining())
+                        return -1;
+
+                    ByteBuffer temp = null;
+                    try {
+                        temp = decompressionBufferSupplier.get(1);
+                        int res = stream.read(temp);
+                        if (res <= 0) {
+                            return -1;
+                        }
+                        return Byte.toUnsignedInt(temp.get());
+                    } finally {
+                        if (temp != null)
+                            decompressionBufferSupplier.release(temp);
+                    }
                 }
+                @Override
+                public int read(byte[] b, int off, int len) throws IOException {
+                    // prevent a call to underlying stream if no data is remaining
+                    if (!stream.hasRemaining())
+                        return -1;
 
+                    int res = stream.read(ByteBuffer.wrap(b, off, len));
+                    if (res <= 0) {
+                        return -1;
+                    }
+                    return res;
+                }
                 @Override
-                public void release(ByteBuffer buffer) {
-                    decompressionBufferSupplier.release(buffer);
+                public void close() {
+                    stream.close();
                 }
             };

Review Comment:
   The underlying stream is not a stream of type `InputStream`. I couldn't change that while introducing the new interface because it would have been a backward incompatible change to existing interfaces of zstd-jni such as https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/ZstdDirectBufferDecompressingStreamNoFinalizer.java
   
   Hence, this wrapper converts the underlying zstd-jni stream into a InputStream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org