You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Oleksandr Konopko (JIRA)" <ji...@apache.org> on 2018/06/11 10:34:00 UTC

[jira] [Created] (KAFKA-7035) Kafka Processor's init() method sometimes is not called

Oleksandr Konopko created KAFKA-7035:
----------------------------------------

             Summary: Kafka Processor's init() method sometimes is not called
                 Key: KAFKA-7035
                 URL: https://issues.apache.org/jira/browse/KAFKA-7035
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.0.0
            Reporter: Oleksandr Konopko


Scenario:

1. We have processing of Kafka Topic which is implemented with Processor API

2. We want to collect metrics (lets say just count number of processed entities for simplicity)

3. How we tried to organize this
 * process data with process() method and send it down the stream with context
 * on each call of process() method update the counter
 * schedule puctuate function which will send metric to special topic. Metric is build with counter

You can find the code (we removed all business sensitive code out of it, so it should be easy to read) in attachment

 

Problematic Kafka Streams behaviour that i can see by logging every step:

1. We have 800000 messages in the input topic

2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, ProcessorB, ProcessorC and ProcessorD

3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed correctly, results are sent down the stream. Counter is upated

4. init() method was not called for ProcessorA and ProcessorB

5. ProcessorC and ProcessorD are created and they start to receive all the rest of data. 95-99%

6. init() method is called for both ProcessorC and ProcessorD. It initiates punctuation, which causes Metrics message be created and sent down the metric stream periodically

7. ProcessorA and ProcessorB are closed. init() was never called for them. So Metric entity was not sent to metrics topic

8. Processing is finished.

 

In the end:

Expected:
 * 800000 entities were processed and sent to the Sink
 * Metrics entities contain counters which sum up to 800000

Actual results:
 * 800000 entities were processed and sent to the Sink
 * Metrics entities contain counters which sum up to some number 3-6% less than 800000, for example 786543

 

Problem:
 * init() method call is not guaranteed
 * there is no way to guarantee that all work was done by punctuate method before close()

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)