You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Antonio Tomac (Jira)" <ji...@apache.org> on 2021/06/27 00:34:00 UTC

[jira] [Created] (KAFKA-12999) NPE when accessing RecordHeader.key() concurrently

Antonio Tomac created KAFKA-12999:
-------------------------------------

             Summary: NPE when accessing RecordHeader.key() concurrently
                 Key: KAFKA-12999
                 URL: https://issues.apache.org/jira/browse/KAFKA-12999
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.8.0
            Reporter: Antonio Tomac


h2. Summary

After upgrading clients to {{2.8.0}}, reading {{ConsumerRecord}}'s header keys started resulting in occasional {{java.lang.NullPointerException}} in case of concurrent access from multiple(2) threads.
h2. Where

NPE happens here [RecordHeader.java:45|https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L45]:
{code:java}
public String key() {
    if (key == null) {
        key = Utils.utf8(keyBuffer, keyBuffer.remaining()); // NPE here 
        keyBuffer = null;
    }
    return key;
}
{code}
h2. When/why

Cause of issue is introduced by changes of KAFKA-10438 to avoid unnecessary creation of key's *{color:#0747a6}{{String}}{color}* when it might never be used.
 It is good optimization but this *lazy* initialization of field {{RecordHeader.key}} creates a problem if being accessed/initialized by 2 threads concurrently since it's now no longer read-only operation and there is race between initializing {color:#0747a6}*{{key}}*{color} and nullifying {color:#0747a6}*{{keyBuffer}}*{color}
h2. Simple workaround

Upon consuming record(s) and before passing {color:#0747a6}*{{ConsumerRecord}}*{color} to multiple processing threads, eagerly initialize all header keys by iterating through headers and invoking {color:#0747a6}*{{key()}}*{color} or even {color:#0747a6}*{{ConsumerRecord.headers().hashCode()}}*{color} which will initialize all keys (and header values too)
h2. Consequences

Current implementation renders RecordHeader not thread-safe for read-only access.
h2. Reproducibility

With enough iterations it's always possible to reproduce (at least on my local)
 Here is minimal snippet to reproduce:
{code:java}
@Test
public void testConcurrentKeyInit() throws ExecutionException, InterruptedException {
    ByteBuffer keyBuffer = ByteBuffer.wrap("key".getBytes(StandardCharsets.UTF_8));
    ByteBuffer valueBuffer = ByteBuffer.wrap("value".getBytes(StandardCharsets.UTF_8));

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    try {
        for (int i = 0; i < 1_000_000; i++) {
            RecordHeader header = new RecordHeader(keyBuffer, valueBuffer);
            Future<String> future = executorService.submit(header::key);
            assertEquals("key", header.key());
            assertEquals("key", future.get());
        }
    } finally {
        executorService.shutdown();
    }
}
{code}
h2. Possible solution #1

Leave implementation as-is but somehow document this to users.
h2. Possible solution #2

Add some concurrency primitives to current implementation
*   simply adding {color:#0747a6}*{{synchronized}}*{color} on method *{color:#0747a6}{{key()}}{color}* (and on *{color:#0747a6}{{value()}}{color}* too) gives correct behaviour avoiding race-conditions. 
* JMH benchmark comparing *{color:#0747a6}{{key()}}{color}* with and without {color:#0747a6}*{{synchronized}}*{color} showed no significant performance penalty
{code}
Benchmark                              Mode  Cnt   Score   Error  Units
RecordHeaderBenchmark.key              avgt    15  31.308 ± 7.862  ns/op
RecordHeaderBenchmark.synchronizedKey  avgt    15  31.853 ± 7.096  ns/op
{code}

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)