You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/02/16 23:03:36 UTC

[kafka] branch trunk updated: KAFKA-6430: Add buffer for gzip streams (#4537)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 13caded  KAFKA-6430: Add buffer for gzip streams (#4537)
13caded is described below

commit 13caded15e21bff450f6f042e86f856046e0f40a
Author: ying-zheng <zh...@rocketmail.com>
AuthorDate: Fri Feb 16 15:03:32 2018 -0800

    KAFKA-6430: Add buffer for gzip streams (#4537)
    
    As described in the JIRA ticket, this can double throughput.
---
 .../org/apache/kafka/common/record/CompressionType.java    | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 16d6e01..9b3bfc4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.invoke.MethodHandle;
@@ -49,8 +51,10 @@ public enum CompressionType {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
             try {
-                // GZIPOutputStream has a default buffer size of 512 bytes, which is too small
-                return new GZIPOutputStream(buffer, 8 * 1024);
+                // Set input buffer (uncompressed) to 16 KB (none by default) and output buffer (compressed) to
+                // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller passes a small
+                // number of bytes to write (potentially a single byte)
+                return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16 * 1024);
             } catch (Exception e) {
                 throw new KafkaException(e);
             }
@@ -59,7 +63,11 @@ public enum CompressionType {
         @Override
         public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
             try {
-                return new GZIPInputStream(new ByteBufferInputStream(buffer));
+                // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
+                // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
+                // number of bytes (potentially a single byte)
+                return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
+                        16 * 1024);
             } catch (Exception e) {
                 throw new KafkaException(e);
             }

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.