You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jiamei Xie <Ji...@arm.com> on 2020/04/02 04:01:15 UTC

How is decompress called in DefaultRecordBatch#iterator

Hi all,
There are huge latency gap between arm and x86 when running kafka-producer-perf-test.sh with compression set, especially  when the codec is zstd. So I looked into the source code. It seems the response from broker server is too slow for arm.  And the LogValidator#validateMessagesAndAssignOffsetsCompressed takes too much time.

I have read source code and know that broker will validate Messages from producer by invoking LogValidator#validateMessagesAndAssignOffsetsCompressed  in case it’s compressed.  It seems the decompress is processed by compressedIterator.  I think decompress is called when iterator.next()) is invoked. Do I misunderstand this code?

public Iterator<Record> iterator() {
    if (count() == 0)
        return Collections.emptyIterator();

    if (!isCompressed())
        return uncompressedIterator();
    // for a normal iterator, we cannot ensure that the underlying compression stream is closed,
    // so we decompress the full record set here. Use cases which call for a lower memory footprint
    // can use `streamingIterator` at the cost of additional complexity
    try (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING, false)) {
        List<Record> records = new ArrayList<>(count());
        while (iterator.hasNext())
            records.add(iterator.next());
        return records.iterator();
    }
}

compressedIterator will return StreamRecordIterator. And there is a method doReadRecord which calls DefaultRecord.readFrom. Does the decompression procedure occur in DefaultRecord.readFrom? I am really confused.

private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
    final ByteBuffer buffer = this.buffer.duplicate();
    buffer.position(RECORDS_OFFSET);

    final DataInputStream inputStream = new DataInputStream(compressionType().wrapForInput(buffer, magic(),
        bufferSupplier));

    if (skipKeyValue) {
        // this buffer is used to skip length delimited fields like key, value, headers
        byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];

        return new StreamRecordIterator(inputStream) {
            @Override
            protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException {
                return DefaultRecord.readPartiallyFrom(inputStream, skipArray, baseOffset, firstTimestamp, baseSequence, logAppendTime);
            }
        };
    } else {
        return new StreamRecordIterator(inputStream) {
            @Override
            protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException {
                return DefaultRecord.readFrom(inputStream, baseOffset, firstTimestamp, baseSequence, logAppendTime);
            }
        };
    }
}

In method compressedIterator ,take zstd as an example , the wrapForInput is


public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
    try {
        return (InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
    } catch (Throwable e) {
        throw new KafkaException(e);
    }
},

Method wrapForInput returns a InputStream.  For ZstdInputStream, it invokes decompressStream when read() or read(byte[], int , int ) is called.

In readFrom, I add some log to estimate the time consumed. Below is an example for estimating time consumed by ByteUtils.readVarint(input); And I get time for arm is 3966, time for x86 is 1484.
// i and sum are static.
296     public static DefaultRecord readFrom(DataInput input,
297                                          long baseOffset,
298                                          long baseTimestamp,
299                                          int baseSequence,
300                                          Long logAppendTime) throws IOException {
301         long startTime = 0, endTime = 0;
302         if (i >= 5000 && i < 6000) {
303           startTime = System.nanoTime() ;
304          }
// compute the time consumed by ByteUtils.readVarint
305         int sizeOfBodyInBytes = ByteUtils.readVarint(input);
306         if (i >= 5000 && i < 6000) {
307            long end_time = System.nanoTime();
308            sum += end_time -startTime;
309         }
310         if (i == 6000)
311           log.warn("#### readVarint {} ", sum / 1000 );
312
313         ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
314         input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes);
315         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes;
316         i++;
317         return readFrom(recordBuffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp,
318                 baseSequence, logAppendTime);
319     }

Method readVarint is

public static int readVarint(DataInput in) throws IOException {
    int value = readUnsignedVarint(in);
    return (value >>> 1) ^ -(value & 1);
}
which should be not so huge gap between ARM and X86 if it’s just some >>> and ^ operation.

I am stuck in here. Can any one give some tips about how effectively debug this issue?  And where is exactly the decompress called ?



The default compress level for zstd is 3, and I got there latency by lzbench,
time (us)
compress
x86
arm
arm/x86
zstd
63875
88423
1.384313
snappy
36838
43936
1.192681
lz4
26125
34254
1.311158
decompress
x86
arm
arm/x86
zstd
14255
25599
1.795791
snappy
12569
18588
1.478877
lz4
7005
11475
1.638116

And the latency got by kafka-producer-perf-test.sh has a huge gap.
Avarage latency (ms)
compression
x86
arm
gzip
1.2
19.43
snappy
7.44
346.79
lz4
4.65
352.67
zstd
7.1
3692.8

Best Wishes,
Jiamei

IMPORTANT NOTICE: The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.