You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ying Zheng (JIRA)" <ji...@apache.org> on 2018/01/07 22:23:00 UTC

[jira] [Updated] (KAFKA-6430) Improve Kafka GZip compression performance

     [ https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ying Zheng updated KAFKA-6430:
------------------------------
    Description: 
To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
	new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
       new DataInputStream(new GZIPInputStream(buffer));
This is very straight forward, but actually inefficient. For each message, in addition to the key and value data, Kafka has to write about 30 some metadata bytes (slightly varies in different Kafka version), including magic byte, checksum, timestamp, offset, key length, value length etc. For each of these bytes, java DataOutputStream has to call write(byte) once. Here is the awkward writeInt() method in DataInputStream, which writes 4 bytes separately in big-endian order. 
    public final void writeInt(int v) throws IOException {
        out.write((v >>> 24) & 0xFF);
        out.write((v >>> 16) & 0xFF);
        out.write((v >>>  8) & 0xFF);
        out.write((v >>>  0) & 0xFF);
        incCount(4);
    }

Unfortunately, GZIPOutputStream does not implement the write(byte) method. Instead, it only provides a write(byte[], offset, len) method, which calls the corresponding JNI zlib function. The write(byte) calls from DataOutputStream are translated into write(byte[], offset, len) calls in a very inefficient way: (Oracle JDK 1.8 code)
class DeflaterOutputStream {
    public void write(int b) throws IOException {
        byte[] buf = new byte[1];
        buf[0] = (byte)(b & 0xff);
        write(buf, 0, 1);
    }

    public void write(byte[] b, int off, int len) throws IOException {
        if (def.finished()) {
            throw new IOException("write beyond end of stream");
        }
        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return;
        }
        if (!def.finished()) {
            def.setInput(b, off, len);
            while (!def.needsInput()) {
                deflate();
            }
        }
    }
}

class GZIPOutputStream extends DeflaterOutputStream {
    public synchronized void write(byte[] buf, int off, int len)
        throws IOException
    {
        super.write(buf, off, len);
        crc.update(buf, off, len);
    }
}

class Deflater {
private native int deflateBytes(long addr, byte[] b, int off, int len, int flush);
}

class CRC32 {
    public void update(byte[] b, int off, int len) {
        if (b == null) {
            throw new NullPointerException();
        }
        if (off < 0 || len < 0 || off > b.length - len) {
            throw new ArrayIndexOutOfBoundsException();
        }
        crc = updateBytes(crc, b, off, len);
    }

    private native static int updateBytes(int crc, byte[] b, int off, int len);
}
For each meta data byte, the code above has to allocate 1 single byte array, acquire several locks, call two native JNI methods (Deflater.deflateBytes and CRC32.updateBytes). In each Kafka message, there are about 30 some meta data bytes.

The call stack of Deflater.deflateBytes():
DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] buf, int off, int len) -> DeflaterOutputStream.write(byte[] b, int off, int len) -> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, int len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush)

The call stack of CRC32.updateBytes():
DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] buf, int off, int len) -> CRC32.update(byte[] b, int off, int len) -> CRC32.updateBytes(int crc, byte[] b, int off, int len)

At Uber, we found that adding a small buffer between DataOutputStream and GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in average.
 -                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
+                    return new DataOutputStream(new BufferedOutputStream(new GZIPOutputStream(buffer, bufferSize), 1 << 14));

The similar fix also applies to GZip decompression.

We have tested this improvement on Kafka 10.2 / Oracle JDK 8, with the production traffic at Uber:
|| Topic || Avg Message Size (bytes) || Vanilla Kafka Throughput (MB/s) || Kafka /w GZip Buffer Throughput (MB/s) || Speed Up||
| topic 1 | 197 | 10.9 | 21.9 | 2.0 |
| topic 2 | 208 | 8.5 | 15.9 | 1.9 |
| topic 3 | 624 | 15.3 | 20.2 | 1.3 |
| topic 4 | 766 | 28.0 | 43.7 | 1.6 |
| topic 5 | 1168 | 22.9 | 25.4 | 1.1 |
| topic 6 | 165021 | 9.1 | 9.2 |  1.0 |



  was:
To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
	new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
       new DataInputStream(new GZIPInputStream(buffer));
This is very straight forward, but actually inefficient. For each message, in addition to the key and value data, Kafka has to write about 30 some metadata bytes (slightly varies in different Kafka version), including magic byte, checksum, timestamp, offset, key length, value length etc. For each of these bytes, java DataOutputStream has to call write(byte) once. Here is the awkward writeInt() method in DataInputStream, which writes 4 bytes separately in big-endian order. 
    public final void writeInt(int v) throws IOException {
        out.write((v >>> 24) & 0xFF);
        out.write((v >>> 16) & 0xFF);
        out.write((v >>>  8) & 0xFF);
        out.write((v >>>  0) & 0xFF);
        incCount(4);
    }

Unfortunately, GZIPOutputStream does not implement the write(byte) method. Instead, it only provides a write(byte[], offset, len) method, which calls the corresponding JNI zlib function. The write(byte) calls from DataOutputStream are translated into write(byte[], offset, len) calls in a very inefficient way: (Oracle JDK 1.8 code)
class DeflaterOutputStream {
    public void write(int b) throws IOException {
        byte[] buf = new byte[1];
        buf[0] = (byte)(b & 0xff);
        write(buf, 0, 1);
    }

    public void write(byte[] b, int off, int len) throws IOException {
        if (def.finished()) {
            throw new IOException("write beyond end of stream");
        }
        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return;
        }
        if (!def.finished()) {
            def.setInput(b, off, len);
            while (!def.needsInput()) {
                deflate();
            }
        }
    }
}

class GZIPOutputStream extends DeflaterOutputStream {
    public synchronized void write(byte[] buf, int off, int len)
        throws IOException
    {
        super.write(buf, off, len);
        crc.update(buf, off, len);
    }
}

class Deflater {
private native int deflateBytes(long addr, byte[] b, int off, int len, int flush);
}

class CRC32 {
    public void update(byte[] b, int off, int len) {
        if (b == null) {
            throw new NullPointerException();
        }
        if (off < 0 || len < 0 || off > b.length - len) {
            throw new ArrayIndexOutOfBoundsException();
        }
        crc = updateBytes(crc, b, off, len);
    }

    private native static int updateBytes(int crc, byte[] b, int off, int len);
}
For each meta data byte, the code above has to allocate 1 single byte array, acquire several locks, call two native JNI methods (Deflater.deflateBytes and CRC32.updateBytes). In each Kafka message, there are about 30 some meta data bytes.

The call stack of Deflater.deflateBytes():
DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] buf, int off, int len) -> DeflaterOutputStream.write(byte[] b, int off, int len) -> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, int len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush)

The call stack of CRC32.updateBytes():
DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] buf, int off, int len) -> CRC32.update(byte[] b, int off, int len) -> CRC32.updateBytes(int crc, byte[] b, int off, int len)

At Uber, we found that adding a small buffer between DataOutputStream and GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in average.
 -                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
+                    return new DataOutputStream(new BufferedOutputStream(new GZIPOutputStream(buffer, bufferSize), 1 << 14));

The similar fix also applies to GZip decompression.

Here is the test result using the production traffic at Uber:
|| Topic || Avg Message Size (bytes) || Vanilla Kafka Throughput (MB/s) || Kafka /w GZip Buffer Throughput (MB/s) || Speed Up||
| topic 1 | 197 | 10.9 | 21.9 | 2.0 |
| topic 2 | 208 | 8.5 | 15.9 | 1.9 |
| topic 3 | 624 | 15.3 | 20.2 | 1.3 |
| topic 4 | 766 | 28.0 | 43.7 | 1.6 |
| topic 5 | 1168 | 22.9 | 25.4 | 1.1 |
| topic 6 | 165021 | 9.1 | 9.2 |  1.0 |




> Improve Kafka GZip compression performance
> ------------------------------------------
>
>                 Key: KAFKA-6430
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6430
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, core
>            Reporter: Ying Zheng
>            Priority: Minor
>
> To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
> 	new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
> To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
>        new DataInputStream(new GZIPInputStream(buffer));
> This is very straight forward, but actually inefficient. For each message, in addition to the key and value data, Kafka has to write about 30 some metadata bytes (slightly varies in different Kafka version), including magic byte, checksum, timestamp, offset, key length, value length etc. For each of these bytes, java DataOutputStream has to call write(byte) once. Here is the awkward writeInt() method in DataInputStream, which writes 4 bytes separately in big-endian order. 
>     public final void writeInt(int v) throws IOException {
>         out.write((v >>> 24) & 0xFF);
>         out.write((v >>> 16) & 0xFF);
>         out.write((v >>>  8) & 0xFF);
>         out.write((v >>>  0) & 0xFF);
>         incCount(4);
>     }
> Unfortunately, GZIPOutputStream does not implement the write(byte) method. Instead, it only provides a write(byte[], offset, len) method, which calls the corresponding JNI zlib function. The write(byte) calls from DataOutputStream are translated into write(byte[], offset, len) calls in a very inefficient way: (Oracle JDK 1.8 code)
> class DeflaterOutputStream {
>     public void write(int b) throws IOException {
>         byte[] buf = new byte[1];
>         buf[0] = (byte)(b & 0xff);
>         write(buf, 0, 1);
>     }
>     public void write(byte[] b, int off, int len) throws IOException {
>         if (def.finished()) {
>             throw new IOException("write beyond end of stream");
>         }
>         if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
>             throw new IndexOutOfBoundsException();
>         } else if (len == 0) {
>             return;
>         }
>         if (!def.finished()) {
>             def.setInput(b, off, len);
>             while (!def.needsInput()) {
>                 deflate();
>             }
>         }
>     }
> }
> class GZIPOutputStream extends DeflaterOutputStream {
>     public synchronized void write(byte[] buf, int off, int len)
>         throws IOException
>     {
>         super.write(buf, off, len);
>         crc.update(buf, off, len);
>     }
> }
> class Deflater {
> private native int deflateBytes(long addr, byte[] b, int off, int len, int flush);
> }
> class CRC32 {
>     public void update(byte[] b, int off, int len) {
>         if (b == null) {
>             throw new NullPointerException();
>         }
>         if (off < 0 || len < 0 || off > b.length - len) {
>             throw new ArrayIndexOutOfBoundsException();
>         }
>         crc = updateBytes(crc, b, off, len);
>     }
>     private native static int updateBytes(int crc, byte[] b, int off, int len);
> }
> For each meta data byte, the code above has to allocate 1 single byte array, acquire several locks, call two native JNI methods (Deflater.deflateBytes and CRC32.updateBytes). In each Kafka message, there are about 30 some meta data bytes.
> The call stack of Deflater.deflateBytes():
> DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] buf, int off, int len) -> DeflaterOutputStream.write(byte[] b, int off, int len) -> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, int len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush)
> The call stack of CRC32.updateBytes():
> DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] buf, int off, int len) -> CRC32.update(byte[] b, int off, int len) -> CRC32.updateBytes(int crc, byte[] b, int off, int len)
> At Uber, we found that adding a small buffer between DataOutputStream and GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in average.
>  -                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
> +                    return new DataOutputStream(new BufferedOutputStream(new GZIPOutputStream(buffer, bufferSize), 1 << 14));
> The similar fix also applies to GZip decompression.
> We have tested this improvement on Kafka 10.2 / Oracle JDK 8, with the production traffic at Uber:
> || Topic || Avg Message Size (bytes) || Vanilla Kafka Throughput (MB/s) || Kafka /w GZip Buffer Throughput (MB/s) || Speed Up||
> | topic 1 | 197 | 10.9 | 21.9 | 2.0 |
> | topic 2 | 208 | 8.5 | 15.9 | 1.9 |
> | topic 3 | 624 | 15.3 | 20.2 | 1.3 |
> | topic 4 | 766 | 28.0 | 43.7 | 1.6 |
> | topic 5 | 1168 | 22.9 | 25.4 | 1.1 |
> | topic 6 | 165021 | 9.1 | 9.2 |  1.0 |



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)