You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "yogananth mahalingam (JIRA)" <ji...@apache.org> on 2015/09/29 09:25:04 UTC

[jira] [Issue Comment Deleted] (CAMEL-9182) camel-kafka : Kafka Endpoint executor threadpool is not initialized based on consumercount

     [ https://issues.apache.org/jira/browse/CAMEL-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

yogananth mahalingam updated CAMEL-9182:
----------------------------------------
    Comment: was deleted

(was: Hi [~davsclaus] - 

Correct me if I am wrong. My interpretation of the below code snippet in KafkaConsumer is, 
* ConsumerCount - create multiple connections & consume messages as if there are multiple endpoints configured.
* consumerSteams - Determines the number of KafkaStreams retrieved by each KafkaConnection

Number of ConsumerTasks getting created matches consumerCount.
And I expected Executor Threadpool to be initialized based on number of consumerCounts.

{code}
 @Override
    protected void doStart() throws Exception {
        super.doStart();
        log.info("Starting Kafka consumer");

        executor = endpoint.createExecutor();
        for (int i = 0; i < endpoint.getConsumersCount(); i++) {
            ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());

            // commit periodically
            if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
                if ((endpoint.getConsumerTimeoutMs() == null || endpoint.getConsumerTimeoutMs() < 0)
                        && endpoint.getConsumerStreams() > 1) {
                    LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
                }
                CyclicBarrier barrier = new CyclicBarrier(endpoint.getConsumerStreams(), new CommitOffsetTask(consumer));
                for (final KafkaStream<byte[], byte[]> stream : streams) {
                    executor.submit(new BatchingConsumerTask(stream, barrier));
                }
                consumerBarriers.put(consumer, barrier);
            } else {
                // auto commit
                for (final KafkaStream<byte[], byte[]> stream : streams) {
                    executor.submit(new AutoCommitConsumerTask(consumer, stream));
                }
                consumerBarriers.put(consumer, null);
            }
        }

    }
{code})

> camel-kafka : Kafka Endpoint executor threadpool is not initialized based on consumercount
> ------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-9182
>                 URL: https://issues.apache.org/jira/browse/CAMEL-9182
>             Project: Camel
>          Issue Type: Bug
>    Affects Versions: 2.15.0
>            Reporter: yogananth mahalingam
>            Assignee: Claus Ibsen
>
> Kafka Endpoint's Executor threadpool is expected to be created based on consumer count.
> This would have enabled multiple consumers to be running concurrently.
> Instead it is initialized based on ConsumerStreams.
> With a configuration of consumer count = 10, consumer stream = 1, the messages are getting consumed sequentially.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)