You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Bozhidar Bozhanov (Jira)" <ji...@apache.org> on 2020/09/10 10:18:00 UTC

[jira] [Created] (KAFKA-10474) Kafka Java client introduces CPU overhead when there are many consumers

Bozhidar Bozhanov created KAFKA-10474:
-----------------------------------------

             Summary: Kafka Java client introduces CPU overhead when there are many consumers
                 Key: KAFKA-10474
                 URL: https://issues.apache.org/jira/browse/KAFKA-10474
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.4.1
            Reporter: Bozhidar Bozhanov


We are using the Kafka Java client (version 2.4.1) and we started noticing a gradual increase in the CPU usage of our production instances. The usage was not correlated with load, because it increases in bumps whenever we acquire a new customer. We realized it's probably the number of consumers that is growing steadily, as each registration adds one consumer per account by default.

 

We did some profiling and by running a local sample with 3000 consumers, we noticed that the JmxReporter generates some CPU overhead. Unfortunately, there's no way to disable it, so we created a class with the same name and package on the classpath and let the classloader pick it up first. That alone lead to a drop from 60% sustained CPU usage to 40% sustained.

 

We then optimized the creation of consumers only for active customers (so no more idle consumers). When the idle consumers were removed, the CPU dropped to 8% sustained usage.

Apart from the JMX metrics reporter, I don't know what contributes to idle consumers generating such CPU overhead. If it helps, our consumer does this:

 

 
{code:java}
try (KafkaConsumer<Integer, PendingAuditLogEntry> consumer = new KafkaConsumer<>(props)) {
	consumer.subscribe(Collections.singletonList(applicationId.toString()));	while (true) {
		try {
			synchronized (applicationId.toString().intern()) {
				
				ConsumerRecords<Integer, Message> messages = consumer.poll(Duration.ofMillis(2000));				List<Message> entries = StreamSupport.stream(messages.spliterator(), false)
						.map(ConsumerRecord::value).collect(Collectors.toList());				// idle consumers don't get here, because they have no entries to process
				if (!entries.isEmpty()) {
					try {
						// logic
						consumer.commitSync();
					} catch (ExecutionException ex) {
						// revert to the last successful batch start offset
						// poll() advances its internal position, which is different from the committed offset
						consumer.committed(consumer.assignment()).forEach(consumer::seek);
					}
				}
			}
			// we want to process more records at once (due to business logic specifics) so we sleep for a preconfigured period of time, in our case - 10 seconds
			Thread.sleep(kafkaSleepMillis);
		} catch (Exception ex) {
			logger.error("Exception while trying to get data from kafka consumer for appId={}, stopping.", applicationId, ex);
			break;
		}
	}
} catch (Exception ex) {
	logger.error("Failed to run kafka consumer", ex);
}{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)