You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Bozhidar Bozhanov (Jira)" <ji...@apache.org> on 2020/09/10 10:19:00 UTC

[jira] [Updated] (KAFKA-10474) Kafka Java client introduces CPU overhead when there are many consumers

     [ https://issues.apache.org/jira/browse/KAFKA-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bozhidar Bozhanov updated KAFKA-10474:
--------------------------------------
    Description: 
We are using the Kafka Java client (version 2.4.1) and we started noticing a gradual increase in the CPU usage of our production instances. The usage was not correlated with load, because it increases in bumps whenever we acquire a new customer. We realized it's probably the number of consumers that is growing steadily, as each registration adds one consumer per account by default.

 

We did some profiling and by running a local sample with 3000 consumers, we noticed that the JmxReporter generates some CPU overhead. Unfortunately, there's no way to disable it, so we created a class with the same name and package on the classpath and let the classloader pick it up first. That alone lead to a drop from 60% sustained CPU usage to 40% sustained.

 

We then optimized the creation of consumers only for active customers (so no more idle consumers). When the idle consumers were removed, the CPU dropped to 8% sustained usage.

Apart from the JMX metrics reporter, I don't know what contributes to idle consumers generating such CPU overhead. If it helps, our consumer does this:

 

 
{code:java}
try (KafkaConsumer<Integer, PendingAuditLogEntry> consumer = new KafkaConsumer<>(props)) {
	consumer.subscribe(Collections.singletonList(applicationId.toString()));	while (true) {
		try {
			synchronized (applicationId.toString().intern()) {
				
				ConsumerRecords<Integer, Message> messages = consumer.poll(Duration.ofMillis(2000));				List<Message> entries = StreamSupport.stream(messages.spliterator(), false)
						.map(ConsumerRecord::value).collect(Collectors.toList());				// idle consumers don't get here, because they have no entries to process
				if (!entries.isEmpty()) {
					try {
						// logic
						consumer.commitSync();
					} catch (ExecutionException ex) {
						// revert to the last successful batch start offset
						// poll() advances its internal position, which is different from the committed offset
						consumer.committed(consumer.assignment()).forEach(consumer::seek);
					}
				}
			}
			// we want to process more records at once (due to business logic specifics) so we sleep for a preconfigured period of time, in our case - 10 seconds
			Thread.sleep(kafkaSleepMillis);
		} catch (Exception ex) {
			logger.error("Exception while trying to get data from kafka consumer for appId={}, stopping.", applicationId, ex);
			break;
		}
	}
} catch (Exception ex) {
	logger.error("Failed to run kafka consumer", ex);
}{code}
 Each consumer runs in a separate thread in a cached execturor service

  was:
We are using the Kafka Java client (version 2.4.1) and we started noticing a gradual increase in the CPU usage of our production instances. The usage was not correlated with load, because it increases in bumps whenever we acquire a new customer. We realized it's probably the number of consumers that is growing steadily, as each registration adds one consumer per account by default.

 

We did some profiling and by running a local sample with 3000 consumers, we noticed that the JmxReporter generates some CPU overhead. Unfortunately, there's no way to disable it, so we created a class with the same name and package on the classpath and let the classloader pick it up first. That alone lead to a drop from 60% sustained CPU usage to 40% sustained.

 

We then optimized the creation of consumers only for active customers (so no more idle consumers). When the idle consumers were removed, the CPU dropped to 8% sustained usage.

Apart from the JMX metrics reporter, I don't know what contributes to idle consumers generating such CPU overhead. If it helps, our consumer does this:

 

 
{code:java}
try (KafkaConsumer<Integer, PendingAuditLogEntry> consumer = new KafkaConsumer<>(props)) {
	consumer.subscribe(Collections.singletonList(applicationId.toString()));	while (true) {
		try {
			synchronized (applicationId.toString().intern()) {
				
				ConsumerRecords<Integer, Message> messages = consumer.poll(Duration.ofMillis(2000));				List<Message> entries = StreamSupport.stream(messages.spliterator(), false)
						.map(ConsumerRecord::value).collect(Collectors.toList());				// idle consumers don't get here, because they have no entries to process
				if (!entries.isEmpty()) {
					try {
						// logic
						consumer.commitSync();
					} catch (ExecutionException ex) {
						// revert to the last successful batch start offset
						// poll() advances its internal position, which is different from the committed offset
						consumer.committed(consumer.assignment()).forEach(consumer::seek);
					}
				}
			}
			// we want to process more records at once (due to business logic specifics) so we sleep for a preconfigured period of time, in our case - 10 seconds
			Thread.sleep(kafkaSleepMillis);
		} catch (Exception ex) {
			logger.error("Exception while trying to get data from kafka consumer for appId={}, stopping.", applicationId, ex);
			break;
		}
	}
} catch (Exception ex) {
	logger.error("Failed to run kafka consumer", ex);
}{code}
 


> Kafka Java client introduces CPU overhead when there are many consumers
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-10474
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10474
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.4.1
>            Reporter: Bozhidar Bozhanov
>            Priority: Major
>
> We are using the Kafka Java client (version 2.4.1) and we started noticing a gradual increase in the CPU usage of our production instances. The usage was not correlated with load, because it increases in bumps whenever we acquire a new customer. We realized it's probably the number of consumers that is growing steadily, as each registration adds one consumer per account by default.
>  
> We did some profiling and by running a local sample with 3000 consumers, we noticed that the JmxReporter generates some CPU overhead. Unfortunately, there's no way to disable it, so we created a class with the same name and package on the classpath and let the classloader pick it up first. That alone lead to a drop from 60% sustained CPU usage to 40% sustained.
>  
> We then optimized the creation of consumers only for active customers (so no more idle consumers). When the idle consumers were removed, the CPU dropped to 8% sustained usage.
> Apart from the JMX metrics reporter, I don't know what contributes to idle consumers generating such CPU overhead. If it helps, our consumer does this:
>  
>  
> {code:java}
> try (KafkaConsumer<Integer, PendingAuditLogEntry> consumer = new KafkaConsumer<>(props)) {
> 	consumer.subscribe(Collections.singletonList(applicationId.toString()));	while (true) {
> 		try {
> 			synchronized (applicationId.toString().intern()) {
> 				
> 				ConsumerRecords<Integer, Message> messages = consumer.poll(Duration.ofMillis(2000));				List<Message> entries = StreamSupport.stream(messages.spliterator(), false)
> 						.map(ConsumerRecord::value).collect(Collectors.toList());				// idle consumers don't get here, because they have no entries to process
> 				if (!entries.isEmpty()) {
> 					try {
> 						// logic
> 						consumer.commitSync();
> 					} catch (ExecutionException ex) {
> 						// revert to the last successful batch start offset
> 						// poll() advances its internal position, which is different from the committed offset
> 						consumer.committed(consumer.assignment()).forEach(consumer::seek);
> 					}
> 				}
> 			}
> 			// we want to process more records at once (due to business logic specifics) so we sleep for a preconfigured period of time, in our case - 10 seconds
> 			Thread.sleep(kafkaSleepMillis);
> 		} catch (Exception ex) {
> 			logger.error("Exception while trying to get data from kafka consumer for appId={}, stopping.", applicationId, ex);
> 			break;
> 		}
> 	}
> } catch (Exception ex) {
> 	logger.error("Failed to run kafka consumer", ex);
> }{code}
>  Each consumer runs in a separate thread in a cached execturor service



--
This message was sent by Atlassian Jira
(v8.3.4#803005)