You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "yogananth mahalingam (JIRA)" <ji...@apache.org> on 2015/09/29 09:25:04 UTC
[jira] [Issue Comment Deleted] (CAMEL-9182) camel-kafka : Kafka
Endpoint executor threadpool is not initialized based on consumercount
[ https://issues.apache.org/jira/browse/CAMEL-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yogananth mahalingam updated CAMEL-9182:
----------------------------------------
Comment: was deleted
(was: Hi [~davsclaus] -
Correct me if I am wrong. My interpretation of the below code snippet in KafkaConsumer is,
* ConsumerCount - create multiple connections & consume messages as if there are multiple endpoints configured.
* consumerSteams - Determines the number of KafkaStreams retrieved by each KafkaConnection
Number of ConsumerTasks getting created matches consumerCount.
And I expected Executor Threadpool to be initialized based on number of consumerCounts.
{code}
@Override
protected void doStart() throws Exception {
super.doStart();
log.info("Starting Kafka consumer");
executor = endpoint.createExecutor();
for (int i = 0; i < endpoint.getConsumersCount(); i++) {
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
// commit periodically
if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs() < 0)
&& endpoint.getConsumerStreams() > 1) {
LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
}
CyclicBarrier barrier = new CyclicBarrier(endpoint.getConsumerStreams(), new CommitOffsetTask(consumer));
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new BatchingConsumerTask(stream, barrier));
}
consumerBarriers.put(consumer, barrier);
} else {
// auto commit
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new AutoCommitConsumerTask(consumer, stream));
}
consumerBarriers.put(consumer, null);
}
}
}
{code})
> camel-kafka : Kafka Endpoint executor threadpool is not initialized based on consumercount
> ------------------------------------------------------------------------------------------
>
> Key: CAMEL-9182
> URL: https://issues.apache.org/jira/browse/CAMEL-9182
> Project: Camel
> Issue Type: Bug
> Affects Versions: 2.15.0
> Reporter: yogananth mahalingam
> Assignee: Claus Ibsen
>
> Kafka Endpoint's Executor threadpool is expected to be created based on consumer count.
> This would have enabled multiple consumers to be running concurrently.
> Instead it is initialized based on ConsumerStreams.
> With a configuration of consumer count = 10, consumer stream = 1, the messages are getting consumed sequentially.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)