You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/02/16 23:03:36 UTC
[kafka] branch trunk updated: KAFKA-6430: Add buffer for gzip
streams (#4537)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 13caded KAFKA-6430: Add buffer for gzip streams (#4537)
13caded is described below
commit 13caded15e21bff450f6f042e86f856046e0f40a
Author: ying-zheng <zh...@rocketmail.com>
AuthorDate: Fri Feb 16 15:03:32 2018 -0800
KAFKA-6430: Add buffer for gzip streams (#4537)
As described in the JIRA ticket, this can double throughput.
---
.../org/apache/kafka/common/record/CompressionType.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 16d6e01..9b3bfc4 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandle;
@@ -49,8 +51,10 @@ public enum CompressionType {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
- // GZIPOutputStream has a default buffer size of 512 bytes, which is too small
- return new GZIPOutputStream(buffer, 8 * 1024);
+ // Set input buffer (uncompressed) to 16 KB (none by default) and output buffer (compressed) to
+ // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller passes a small
+ // number of bytes to write (potentially a single byte)
+ return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
@@ -59,7 +63,11 @@ public enum CompressionType {
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
- return new GZIPInputStream(new ByteBufferInputStream(buffer));
+ // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
+ // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
+ // number of bytes (potentially a single byte)
+ return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
+ 16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.